Move pipeline directory

This commit is contained in:
Aaron Po
2026-04-27 16:00:55 -04:00
parent 5a21589029
commit 9ed37806dd
80 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
skinparam shadowing false
skinparam backgroundColor #FCFCF7
skinparam defaultFontName "DM Sans"
skinparam defaultFontColor #14180C
skinparam titleFontName "Volkhov"
skinparam titleFontColor #14180C
skinparam ArrowColor #656F33
skinparam NoteBackgroundColor #DBEEDD
skinparam NoteFontColor #14180C
skinparam NoteBorderColor #4A5837
skinparam SwimlaneBorderColor #4A5837
skinparam SwimlaneBorderThickness 1
skinparam activityStartColor #EBECE3
skinparam activityEndColor #4A5837
skinparam activityStopColor #4A5837
skinparam ActivityBackgroundColor #EBECE3
skinparam ActivityBorderColor #4A5837
skinparam ActivityDiamondBackgroundColor #CBD2B5
skinparam ActivityDiamondBorderColor #4A5837
skinparam packageStyle rectangle
skinparam packageBackgroundColor #F1F3EA
skinparam packageBorderColor #4A5837
skinparam packageFontColor #14180C
skinparam classBackgroundColor #EBECE3
skinparam classBorderColor #4A5837
skinparam classFontColor #14180C
skinparam classAttributeFontColor #3F4724
skinparam classStereotypeFontColor #4A5837
skinparam interfaceBackgroundColor #DBEEDD
skinparam interfaceBorderColor #4A5837
skinparam interfaceFontColor #14180C
skinparam enumBackgroundColor #E4E6D8
skinparam enumBorderColor #4A5837
skinparam enumFontColor #14180C

View File

@@ -0,0 +1,125 @@
@startuml
skinparam style strictuml
skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 14
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
skinparam NoteBackgroundColor #EAF0E8
skinparam NoteBorderColor #547461
skinparam ActivityBackgroundColor #FAFCF9
skinparam ActivityBorderColor #547461
skinparam ActivityDiamondBackgroundColor #FAFCF9
skinparam ActivityDiamondBorderColor #628A5B
skinparam ActivityBarColor #628A5B
skinparam SwimlaneBorderColor #547461
skinparam SwimlaneBorderThickness 0.3
title The Biergarten Data Pipeline (Streaming Architecture)
|#F2F6F0|main.cc|
start
:ParseArguments(argc, argv);
if (Are arguments valid?) then (no)
:spdlog::error usage info;
stop
else (yes)
endif
:Init CurlGlobalState & LlamaBackendState;
:di::make_injector(...);
:injector.create<std::unique_ptr<BiergartenDataGenerator>>();
:BiergartenDataGenerator::Run();
|#EAF0E8|BiergartenDataGenerator|
:Initialize SQLite export;
|#E0EAE0|SqliteExportService|
:GetUtcTimestamp() from SystemDateTimeProvider;
:Initialize();
note right
Builds a fresh biergarten_seed_<UTC datetime>.sqlite filename
Appends a numeric suffix if the timestamp already exists
Opens DB Connection
Executes Schema DDL
Begins Transaction
end note
|#EAF0E8|BiergartenDataGenerator|
:QueryCitiesWithCountries();
|#E2EBDC|JsonLoader|
:JsonLoader::LoadLocations("locations.json");
:std::ranges::sample(all_locations, 50);
|#EAF0E8|BiergartenDataGenerator|
while (For each sampled Location?) is (Remaining cities)
|#DCE8D8|WikipediaService|
:GetLocationContext(loc);
:FetchExtracts(City, Country, Beer);
|#EAF0E8|BiergartenDataGenerator|
:Store EnrichedCity{Location, region_context};
endwhile (Done)
|#EAF0E8|BiergartenDataGenerator|
:GenerateBreweries(enriched_cities);
|#E5EDE1|DataGenerator|
while (For each EnrichedCity?) is (Remaining cities)
if (Generator Mode) then (MockGenerator)
:DeterministicHash & Format;
else (LlamaGenerator)
:PrepareRegionContext;
:LoadBrewerySystemPrompt("prompts/system.md");
repeat
:Infer(system_prompt, user_prompt, max_tokens, kBreweryJsonGrammar);
:ValidateBreweryJson(raw, brewery);
if (Is JSON Valid?) then (yes)
break
else (no)
:Attempt++;
endif
repeat while (Attempt < 3?) is (yes)
endif
|#EAF0E8|BiergartenDataGenerator|
if (Generation successful?) then (yes)
|#E0EAE0|SqliteExportService|
:ProcessRecord(GeneratedBrewery);
if (Location in cache?) then (yes)
:Reuse location_id;
else (no)
:Insert Location & Cache ID;
endif
:Insert Brewery (FK: location_id);
if (Exception caught during insert?) then (yes)
|#EAF0E8|BiergartenDataGenerator|
:spdlog::warn "Failed to stream record to SQLite export";
note right
Data loss is prevented per-record.
The pipeline continues running.
end note
else (no)
endif
else (no)
:spdlog::warn "Generation failed, skipping...";
endif
|#E5EDE1|DataGenerator|
endwhile (Done)
|#E0EAE0|SqliteExportService|
:Finalize();
note right
Commits Transaction
Closes Database Connection
end note
|#F2F6F0|main.cc|
:Return 0;
stop
@enduml

View File

@@ -0,0 +1,148 @@
@startuml
skinparam style strictuml
skinparam defaultFontName "DM Sans"
skinparam defaultFontSize 14
skinparam titleFontName "Volkhov"
skinparam titleFontSize 20
skinparam backgroundColor #FAFCF9
skinparam defaultFontColor #28342A
skinparam titleFontColor #28342A
skinparam ArrowColor #628A5B
skinparam class {
BackgroundColor #FAFCF9
HeaderBackgroundColor #EAF0E8
BorderColor #547461
ArrowColor #628A5B
FontColor #28342A
}
skinparam note {
BackgroundColor #EAF0E8
BorderColor #547461
FontColor #28342A
}
title The Biergarten Data Pipeline - Class Diagram
class BiergartenDataGenerator {
- context_service_ : std::unique_ptr<IEnrichmentService>
- generator_ : std::unique_ptr<DataGenerator>
- exporter_ : std::unique_ptr<IExportService>
- generated_breweries_ : std::vector<GeneratedBrewery>
+ Run() : bool
- QueryCitiesWithCountries() : std::vector<Location>
- GenerateBreweries(cities : std::span<const EnrichedCity>) : void
- LogResults() : void
}
interface IEnrichmentService <<interface>> {
+ GetLocationContext(loc : const Location&) : std::string
}
class WikipediaService {
- client_ : std::unique_ptr<WebClient>
- extract_cache_ : std::unordered_map<std::string, std::string>
+ GetLocationContext(loc : const Location&) : std::string
- FetchExtract(query : std::string_view) : std::string
}
interface WebClient <<interface>> {
+ Get(url : const std::string&) : std::string
+ UrlEncode(value : const std::string&) : std::string
}
class CURLWebClient {
+ Get(url : const std::string&) : std::string
+ UrlEncode(value : const std::string&) : std::string
}
interface DataGenerator <<interface>> {
+ GenerateBrewery(location : const Location&, region_context : const std::string&) : BreweryResult
+ GenerateUser(locale : const std::string&) : UserResult
}
class MockGenerator {
+ GenerateBrewery(...) : BreweryResult
+ GenerateUser(...) : UserResult
- DeterministicHash(location : const Location&) : size_t
}
class LlamaGenerator {
- model_ : ModelHandle
- context_ : ContextHandle
- prompt_formatter_ : std::unique_ptr<IPromptFormatter>
- rng_ : std::mt19937
+ GenerateBrewery(...) : BreweryResult
+ GenerateUser(...) : UserResult
- Load(model_path : const std::string&) : void
- Infer(...) : std::string
- InferFormatted(...) : std::string
- LoadBrewerySystemPrompt(...) : std::string
}
interface IPromptFormatter <<interface>> {
+ Format(system_prompt : std::string_view, user_prompt : std::string_view) : std::string
}
class Gemma4JinjaPromptFormatter {
+ Format(system_prompt : std::string_view, user_prompt : std::string_view) : std::string
}
class JsonLoader {
+ {static} LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
}
interface IExportService <<interface>> {
+ Initialize() : void
+ ProcessRecord(brewery : const GeneratedBrewery&) : void
+ Finalize() : void
}
class SqliteExportService {
- date_time_provider_ : std::unique_ptr<IDateTimeProvider>
- run_timestamp_utc_ : std::string
- database_path_ : std::filesystem::path
- db_handle_ : sqlite3*
- insert_location_stmt_ : sqlite3_stmt*
- insert_brewery_stmt_ : sqlite3_stmt*
- transaction_open_ : bool
- location_cache_ : std::unordered_map<std::string, sqlite3_int64>
+ Initialize() : void
+ ProcessRecord(brewery : const GeneratedBrewery&) : void
+ Finalize() : void
- InitializeSchema() : void
}
interface IDateTimeProvider <<interface>> {
+ GetUtcTimestamp() : std::string
}
class SystemDateTimeProvider {
+ GetUtcTimestamp() : std::string
}
' Structural Relationships / Dependency Injection
BiergartenDataGenerator *-- IEnrichmentService : owns
BiergartenDataGenerator *-- DataGenerator : owns
BiergartenDataGenerator *-- IExportService : owns
IEnrichmentService <|.. WikipediaService : implements
WikipediaService *-- WebClient : owns
WebClient <|.. CURLWebClient : implements
DataGenerator <|.. MockGenerator : implements
DataGenerator <|.. LlamaGenerator : implements
LlamaGenerator *-- IPromptFormatter : uses
IPromptFormatter <|.. Gemma4JinjaPromptFormatter : implements
BiergartenDataGenerator ..> JsonLoader : uses
IExportService <|.. SqliteExportService : implements
SqliteExportService *-- IDateTimeProvider : owns
IDateTimeProvider <|.. SystemDateTimeProvider : implements
@enduml

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,360 @@
@startuml biergarten_activity
!include ../biergarten-weizen-theme.puml
skinparam defaultFontSize 13
skinparam titleFontSize 20
title The Biergarten Data Pipeline — Activity Diagram
|Main|
start
:ParseArguments(argc, argv);
if (Invalid args?) then (yes)
:spdlog::error;
stop
else (no)
endif
:Init CurlGlobalState & LlamaBackendState;
:Build DI injector;
:Initialize SqliteExportService;
note right
Opens SQLite connection.
(Transactions are now managed
per-phase via batching).
end note
:Create BoundedChannel<LogEntry> log_ch;
:Spawn Log Worker thread;
note right
Log worker drains log_ch for the
entire pipeline lifetime.
All workers emit LogEntry structs
via PipelineLogger -- never spdlog directly.
end note
:BiergartenPipelineOrchestrator::Run();
|BiergartenPipelineOrchestrator::Run()|
fork
:JsonLoader::LoadBeerStyles("beer-styles.json");
:EnrichmentService::PreWarmBeerStyleCache(beer_styles);
fork again
:JsonLoader::LoadLocations("locations.json");
:EnrichmentService::PreWarmLocationCache(sampled_locations);
end fork
fork
:JsonLoader::LoadNamesByCountry("names-by-country.json");
fork again
:JsonLoader::LoadPersonas("personas.json");
end fork
' ═══════════════════════════════════════════
' PHASE 0 — USER GENERATION
' ═══════════════════════════════════════════
|Orchestrator|
:RunUserPhase(sampled_locations);
:Create BoundedChannels\n(loc_ch, exp_ch);
fork
|Orchestrator|
:Loop: Send Locations -> loc_ch;
:Close loc_ch;
note right
Producer closes loc_ch.
LLM Worker while loop
terminates on empty + closed.
end note
fork again
|LLM Worker|
while (loc_ch has items?) is (yes)
:Receive Location;
:GetLocationContextFromCache(location);
note right
Guaranteed cache hit from startup.
end note
:IPersonaSelectionStrategy::SelectPersona(\n personas_palette_);
note right
Guaranteed cache hit from startup.
Returns a Persona struct carrying
style_affinities, abv_range,
ibu_preference, checkin_weight.
end note
:NamesByCountry::SampleName(\n location.iso3166_1);
note right
Deterministic lookup -- no LLM involved.
Name selected from pre-keyed table
and passed into the generation prompt.
end note
:GenerateUser(enriched_city, persona, sampled_name)\nvia DataGenerator;
note right
LLM receives: EnrichedCity context + persona
description + sampled name. Generates
bio and preference signals grounded
in locale and persona.
end note
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "llm");
:Send GeneratedUser -> exp_ch;
endwhile (no)
:Close exp_ch;
note right
Producer closes exp_ch.
SQLite Worker while loop
terminates on empty + closed.
end note
fork again
|SQLite Worker|
:BEGIN TRANSACTION;
while (exp_ch has items?) is (yes)
:Receive GeneratedUser;
:ProcessUser(user);
:PipelineLogger::Log(Info, UserGeneration,\n city, user_id, "sqlite");
:Append -> user_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (no)
:COMMIT (Final);
end fork
|Orchestrator|
:Join LLM Worker, SQLite Worker;
' ═══════════════════════════════════════════
' PHASE 1a — BREWERY GENERATION
' ═══════════════════════════════════════════
:RunBreweryPhase(sampled_locations);
:Create BoundedChannels\n(loc_ch, exp_ch);
fork
|Orchestrator|
:Loop: Sample User from user_pool_
and pair with Location;
:Send BreweryTask(Location, User) -> loc_ch;
:Close loc_ch;
fork again
|LLM Worker|
while (loc_ch has items?) is (yes)
:Receive BreweryTask(Location, User);
:GetLocationContextFromCache(task.location);
note right
Guaranteed cache hit from startup.
end note
:GenerateBrewery(enriched_city, context, task.user)\nvia DataGenerator;
note right
KV cache stays warm.
Brewery is linked to the sampled owner_user_id.
end note
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "llm");
:Send GeneratedBrewery -> exp_ch;
endwhile (no)
:Close exp_ch;
fork again
|SQLite Worker|
:BEGIN TRANSACTION;
while (exp_ch has items?) is (yes)
:Receive GeneratedBrewery;
:ProcessBrewery(brewery);
:PipelineLogger::Log(Info,\n BreweryGeneration,\n city, brewery_id, "sqlite");
:Append -> brewery_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (no)
:COMMIT (Final);
end fork
|Orchestrator|
:Join LLM Worker, SQLite Worker;
note right
brewery_pool_ is now fully populated.
Phase 1b may begin.
end note
' ═══════════════════════════════════════════
' PHASE 1b — BEER GENERATION
' ═══════════════════════════════════════════
:RunBeerPhase();
:Create BoundedChannels\n(brew_ch, exp_ch);
fork
|Orchestrator|
:Loop: Send Breweries -> brew_ch;
:Close brew_ch;
fork again
|LLM Worker|
while (brew_ch has items?) is (yes)
:Receive GeneratedBrewery;
:IBeerSelectionStrategy::SelectStyles(\n brewery, beer_style_palette_);
while (For each selected BeerStyle?) is (remaining)
:GetStyleContextFromCache(style);
note right
Guaranteed cache hit from startup.
KV cache stays warm across all
beer generations -- system prompt
does not change within this phase.
end note
:GenerateBeer(brewery, style_context)\nvia DataGenerator;
:Attach GeneratedBeer to bundle;
endwhile (done)
:PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "llm");
:Send BeersBundle -> exp_ch;
endwhile (no)
:Close exp_ch;
fork again
|SQLite Worker|
:BEGIN TRANSACTION;
while (exp_ch has items?) is (yes)
:Receive BeersBundle;
while (For each beer in bundle?) is (remaining)
:Set beer.brewery_id from bundle;
:ProcessBeer(beer);
:Append -> beer_pool_;
endwhile (done)
:PipelineLogger::Log(Info,\n BeerGeneration,\n city, brewery_id, "sqlite");
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (no)
:COMMIT (Final);
end fork
|Orchestrator|
:Join LLM Worker, SQLite Worker;
note right
Both brewery_pool_ and beer_pool_
are now completely populated.
Checkin and Follow phases may
now run in parallel.
end note
' ═══════════════════════════════════════════
' PHASE 2 — CHECKIN + FOLLOW GENERATION
' (parallel — both depend only on user_pool_
' and brewery_pool_ being fully populated)
' ═══════════════════════════════════════════
fork
|Orchestrator|
:RunCheckinPhase();
:ICheckinDistributionStrategy::\nAssignActivityWeights(user_pool_);
note right
Weights seeded from each user's
persona.checkin_weight. J-curve profile
emerges from persona distribution.
end note
:BEGIN TRANSACTION;
while (For each GeneratedUser in user_pool_?) is (remaining)
:CheckinsForUser(user, brewery_pool_.size());
while (For each checkin index?) is (remaining)
:TimestampFor(user, index);
:Select brewery from brewery_pool_;
:GenerateCheckin(user, brewery, timestamp)\nvia DataGenerator;
:ProcessCheckin(checkin);
:PipelineLogger::Log(Info, CheckinGeneration,\n nullopt, checkin_id, "sqlite");
:Append -> checkin_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (done)
endwhile (done)
:COMMIT (Final);
fork again
|Orchestrator|
:RunFollowPhase();
:IFollowGenerationStrategy::\nAssignFollowWeights(user_pool_);
note right
For RandomFollowStrategy, weights
are uniform. For ActivityWeightedFollowStrategy,
weights derived from user.activity_weight
so high-activity users attract more followers.
end note
:BEGIN TRANSACTION;
:IFollowGenerationStrategy::\nGenerateFollows(user_pool_);
note right
Self-follow constraint (follower_id != followed_id)
enforced here and at the DB schema level.
end note
while (For each GeneratedFollow?) is (remaining)
:ProcessFollow(follow);
:PipelineLogger::Log(Info, FollowGeneration,\n nullopt, follower_id, "sqlite");
:Append -> follow_pool_;
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
endwhile (done)
:COMMIT (Final);
end fork
|Orchestrator|
:Join CheckinPhase, FollowPhase;
note right
checkin_pool_ and follow_pool_
are now fully populated.
Rating phase may begin.
end note
' ═══════════════════════════════════════════
' PHASE 3 — RATING GENERATION
' ═══════════════════════════════════════════
:RunRatingPhase();
note right
Beer selection biased by
user.persona.style_affinities and abv_range.
Rating skew modulated per persona.
end note
:BEGIN TRANSACTION;
while (For each GeneratedCheckin in checkin_pool_?) is (remaining)
:Match brewery_id, select beer from beer_pool_\n(same brewery_id, biased by persona affinities);
if (Beer exists for brewery?) then (yes)
:GenerateRating(user, beer, checkin_id)\nvia DataGenerator;
:ProcessRating(rating);
:PipelineLogger::Log(Info, RatingGeneration,\n nullopt, rating_id, "sqlite");
if (Batch size reached?) then (yes)
:COMMIT & BEGIN;
else (no)
endif
else (no)
:PipelineLogger::Log(Warn, RatingGeneration,\n nullopt, brewery_id, "sqlite");
:Skip -- brewery has no beers;
endif
endwhile (done)
:COMMIT (Final);
' ═══════════════════════════════════════════
' TEARDOWN
' ═══════════════════════════════════════════
|Orchestrator|
:Finalize SqliteExportService;
note right
Safely closes the DB connection.
end note
:Close log_ch;
|Main|
:spdlog::info "Pipeline complete in X ms";
:Join Log Worker;
note right
Drain guarantees no LogEntry is
dropped at shutdown.
end note
stop
@enduml

View File

@@ -0,0 +1,559 @@
@startuml
' ==========================================
' CONFIGURATION & STYLING
' ==========================================
!include ../biergarten-weizen-theme.puml
skinparam classAttributeFontSize 9
skinparam defaultFontSize 25
skinparam titleFontSize 30
package "Domain: Models" {
class Location {
+ city : std::string
+ state_province : std::string
+ iso3166_2 : std::string
+ country : std::string
+ iso3166_1 : std::string
+ local_languages : std::vector<std::string>
+ latitude : double
+ longitude : double
}
class LocationContext {
+ text : std::string
+ completeness : Completeness
+ char_count : size_t
}
enum Completeness {
Full
Partial
Absent
}
class EnrichedCity {
+ location : Location
+ context : LocationContext
}
class BeerStyle {
+ name : std::string
+ description : std::string
+ min_abv : float
+ max_abv : float
+ min_ibu : int
+ max_ibu : int
}
class BreweryResult {
+ name_en : std::string
+ description_en : std::string
+ name_local : std::string
+ description_local : std::string
}
class BeerResult {
+ name_en : std::string
+ description_en : std::string
+ name_local : std::string
+ description_local : std::string
+ style : std::string
+ abv : float
+ ibu : int
}
class UserResult {
+ username : std::string
+ bio : std::string
+ activity_weight : float
}
class CheckinResult {
+ checked_in_at : std::string
+ note : std::string
}
class RatingResult {
+ score : float
+ note : std::string
}
class GenerationMetadata {
+ generation_id : uint64_t
+ generated_time : std::string
+ context_provided : bool
+ generated_with : std::string
}
class GeneratedBrewery {
+ brewery_id : uint64_t
+ location : Location
+ brewery : BreweryResult
+ context_completeness : LocationContext::Completeness
+ metadata : GenerationMetadata
}
class GeneratedBeer {
+ beer_id : uint64_t
+ brewery_id : uint64_t
+ location : Location
+ style : BeerStyle
+ beer : BeerResult
+ metadata : GenerationMetadata
}
class GeneratedUser {
+ user_id : uint64_t
+ location : Location
+ user : UserResult
+ metadata : GenerationMetadata
}
class GeneratedCheckin {
+ checkin_id : uint64_t
+ user_id : uint64_t
+ brewery_id : uint64_t
+ checkin : CheckinResult
+ metadata : GenerationMetadata
}
class GeneratedRating {
+ user_id : uint64_t
+ beer_id : uint64_t
+ checkin_id : uint64_t
+ rating : RatingResult
+ metadata : GenerationMetadata
}
class GeneratedFollow {
+ follower_id : uint64_t
+ followed_id : uint64_t
+ metadata : GenerationMetadata
}
class UserPersona {
+ name: std::string
+ description: std::string
+ style_affinities: std::vector<std::string>
}
LocationContext *-- Completeness
}
package "Domain: Application Configuration"{
class SamplingOptions {
+ temperature : float = 1.0F
+ top_p : float = 0.95F
+ top_k : uint32_t = 64
+ n_ctx : uint32_t = 8192
+ seed : int = -1
}
class GeneratorOptions {
+ model_path : std::filesystem::path
+ use_mocked : bool = false
+ sampling : SamplingOptions
}
class PipelineOptions {
+ output_path : std::filesystem::path
+ log_path : std::filesystem::path
}
class ApplicationOptions {
+ generator : GeneratorOptions
+ pipeline : PipelineOptions
}
' --- Domain Model Relationships ---
ApplicationOptions *-- GeneratorOptions
ApplicationOptions *-- PipelineOptions
GeneratorOptions *-- SamplingOptions
}
package "Domain: Policy" {
interface ContextStrategy <<interface>> {
+ QueriesFor(loc : const Location&) : std::vector<std::string>
+ MaxContextChars() : size_t
}
class BreweryContextStrategy {
+ QueriesFor(loc : const Location&) : std::vector<std::string>
+ MaxContextChars() : size_t
}
class BeerContextStrategy {
+ QueriesFor(loc : const Location&) : std::vector<std::string>
+ MaxContextChars() : size_t
}
interface SamplingStrategy <<interface>> {
+ Sample(locations : const std::vector<Location>&) : std::vector<Location>
}
class UniformSamplingStrategy {
- sample_size_ : size_t
+ Sample(locations : const std::vector<Location>&) : std::vector<Location>
}
interface BeerSelectionStrategy <<interface>> {
+ SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span<const BeerStyle>) : std::vector<BeerStyle>
}
class RandomBeerSelectionStrategy {
- rng_ : std::mt19937
- min_beers_ : size_t
- max_beers_ : size_t
+ SelectStyles(brewery : const GeneratedBrewery&,\n palette : std::span<const BeerStyle>) : std::vector<BeerStyle>
}
interface CheckinDistributionStrategy <<interface>> {
+ AssignActivityWeights(users : std::vector<GeneratedUser>&) : void
+ CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t
+ TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string
}
class JCurveCheckinStrategy {
- rng_ : std::mt19937
+ AssignActivityWeights(users : std::vector<GeneratedUser>&) : void
+ CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t
+ TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string
}
class RandomCheckinStrategy {
- rng_ : std::mt19937
- min_checkins_ : size_t
- max_checkins_ : size_t
+ AssignActivityWeights(users : std::vector<GeneratedUser>&) : void
+ CheckinsForUser(user : const GeneratedUser&,\n brewery_count : size_t) : size_t
+ TimestampFor(user : const GeneratedUser&,\n index : size_t) : std::string
}
interface FollowGenerationStrategy <<interface>> {
+ GenerateFollows(users : const std::vector<GeneratedUser>&) : std::vector<GeneratedFollow>
}
class RandomFollowStrategy {
- rng_ : std::mt19937
- min_follows_ : size_t
- max_follows_ : size_t
+ GenerateFollows(users : const std::vector<GeneratedUser>&) : std::vector<GeneratedFollow>
}
class ActivityWeightedFollowStrategy {
- rng_ : std::mt19937
- min_follows_ : size_t
- max_follows_ : size_t
+ GenerateFollows(users : const std::vector<GeneratedUser>&) : std::vector<GeneratedFollow>
}
}
package "Infrastructure: Logging" {
enum LogLevel {
Debug
Info
Warn
Error
}
enum PipelinePhase {
Startup
UserGeneration
BreweryAndBeerGeneration
CheckinGeneration
RatingGeneration
FollowGeneration
Teardown
}
class LogEntry {
+ timestamp : std::chrono::system_clock::time_point
+ level : LogLevel
+ phase : PipelinePhase
+ message : std::string
+ city : std::optional<std::string>
+ entity_id : std::optional<std::string>
+ worker : std::optional<std::string>
}
interface Logger <<interface>> {
+ Log(level, phase, message,\n city, entity_id, worker) : void
}
class PipelineLogger {
- log_ch_ : BoundedChannel<LogEntry>&
+ Log(level, phase, message,\n city, entity_id, worker) : void
}
class LogWorker {
- log_ch_ : BoundedChannel<LogEntry>&
+ Run() : void
- FormatTimestamp(tp) : std::string
- ToSpdlogLevel(level) : spdlog::level::level_enum
- ToString(phase) : std::string
}
' --- Logging Relationships ---
LogEntry *-- LogLevel
LogEntry *-- PipelinePhase
PipelineLogger ..> LogEntry : emits
LogWorker ..> LogEntry : consumes
}
package "Infrastructure: Pipeline Channel" {
class "BoundedChannel<T>" as BoundedChannel {
- queue_ : std::queue<T>
- mutex_ : std::mutex
- not_full_ : std::condition_variable
- not_empty_ : std::condition_variable
- capacity_ : size_t
- closed_ : bool
+ Send(item : T) : void
+ Receive() : std::optional<T>
+ Close() : void
}
}
package "Infrastructure: Data Preloading" {
interface DataPreloader <<interface>> {
+ LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
+ LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector<BeerStyle>
+ LoadPersonas(filepath : const std::filesystem::path&) : std::vector<Persona>
+ LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry
}
class JsonLoader {
+ LoadLocations(filepath : const std::filesystem::path&) : std::vector<Location>
+ LoadBeerStyles(filepath : const std::filesystem::path&) : std::vector<BeerStyle>
+ LoadPersonas(filepath : const std::filesystem::path&) : std::vector<Persona>
+ LoadNamesByCountry(filepath : const std::filesystem::path&) : NamesByCountry
}
}
package "Infrastructure: Enrichment" {
interface EnrichmentService <<interface>> {
+ GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext
}
class WikipediaService {
- client_ : std::unique_ptr<WebClient>
- extract_cache_ : std::unordered_map<std::string, std::string>
+ GetLocationContext(loc : const Location&,\n strategy : const ContextStrategy&) : LocationContext
- FetchExtract(query : std::string_view) : std::string
}
interface WebClient <<interface>> {
+ Get(url : const std::string&) : std::string
+ UrlEncode(value : const std::string&) : std::string
}
class CURLWebClient {
+ Get(url : const std::string&) : std::string
+ UrlEncode(value : const std::string&) : std::string
}
}
package "Infrastructure: Data Generation" {
interface DataGenerator <<interface>> {
+ GenerateBrewery(location : const Location&,\n context : const LocationContext&) : BreweryResult
+ GenerateBeer(brewery_id : uint64_t,\n location : const Location&,\n context : const LocationContext&,\n style : const BeerStyle&) : BeerResult
+ GenerateUser(location : const Location&) : UserResult
+ GenerateCheckin(user : const GeneratedUser&,\n brewery : const GeneratedBrewery&,\n timestamp : const std::string&) : CheckinResult
+ GenerateRating(user : const GeneratedUser&,\n beer : const GeneratedBeer&,\n checkin_id : uint64_t) : RatingResult
}
class MockGenerator {
+ GenerateBrewery(...) : BreweryResult
+ GenerateBeer(...) : BeerResult
+ GenerateUser(...) : UserResult
+ GenerateCheckin(...) : CheckinResult
+ GenerateRating(...) : RatingResult
- DeterministicHash(location : const Location&) : size_t
}
class LlamaGenerator {
- model_ : ModelHandle
- context_ : ContextHandle
- prompt_formatter_ : std::unique_ptr<PromptFormatter>
- rng_ : std::mt19937
+ GenerateBrewery(...) : BreweryResult
+ GenerateBeer(...) : BeerResult
+ GenerateUser(...) : UserResult
+ GenerateCheckin(...) : CheckinResult
+ GenerateRating(...) : RatingResult
- Load(opts : const GeneratorOptions&) : void
- Infer(system_prompt, user_prompt,\n max_tokens, grammar) : std::string
- ValidateModelArchitecture() : void
}
interface PromptFormatter <<interface>> {
+ Format(system_prompt : std::string_view,\n user_prompt : std::string_view) : std::string
+ ExpectedArchitecture() : std::string_view
}
class Gemma4JinjaPromptFormatter {
+ Format(...) : std::string
+ ExpectedArchitecture() : std::string_view
}
}
package "Infrastructure: Data Export" {
interface ExportService <<interface>> {
+ Initialize() : void
+ ProcessBrewery(brewery : const GeneratedBrewery&) : uint64_t
+ ProcessBeer(beer : const GeneratedBeer&) : uint64_t
+ ProcessUser(user : const GeneratedUser&) : uint64_t
+ ProcessCheckin(checkin : const GeneratedCheckin&) : uint64_t
+ ProcessRating(rating : const GeneratedRating&) : void
+ ProcessFollow(follow : const GeneratedFollow&) : void
+ Finalize() : void
}
class SqliteExportService {
- date_time_provider_ : std::unique_ptr<DateTimeProvider>
- db_handle_ : SqliteDatabaseHandle
- insert_location_stmt_ : SqliteStatementHandle
- insert_brewery_stmt_ : SqliteStatementHandle
- insert_beer_stmt_ : SqliteStatementHandle
- insert_user_stmt_ : SqliteStatementHandle
- insert_checkin_stmt_ : SqliteStatementHandle
- insert_rating_stmt_ : SqliteStatementHandle
- insert_follow_stmt_ : SqliteStatementHandle
- transaction_open_ : bool
- location_cache_ : std::unordered_map<std::string, uint64_t>
- brewery_cache_ : std::unordered_map<std::string, uint64_t>
+ Initialize() : void
+ ProcessBrewery(brewery : const GeneratedBrewery&) : uint64_t
+ ProcessBeer(beer : const GeneratedBeer&) : uint64_t
+ ProcessUser(user : const GeneratedUser&) : uint64_t
+ ProcessCheckin(checkin : const GeneratedCheckin&) : uint64_t
+ ProcessRating(rating : const GeneratedRating&) : void
+ ProcessFollow(follow : const GeneratedFollow&) : void
+ Finalize() : void
- InitializeSchema() : void
- PrepareStatements() : void
- RollbackAndCloseNoThrow() : void
- FinalizeStatements() : void
}
interface DateTimeProvider <<interface>> {
+ GetUtcTimestamp() : std::string
}
class SystemDateTimeProvider {
+ GetUtcTimestamp() : std::string
}
}
class BiergartenPipelineOrchestrator {
- preloader_ : std::unique_ptr<DataPreloader>
- enrichment_service_ : std::unique_ptr<EnrichmentService>
- generator_ : std::unique_ptr<DataGenerator>
- logger_ : std::unique_ptr<Logger>
- exporter_ : std::unique_ptr<ExportService>
- brewery_context_strategy_ : std::unique_ptr<ContextStrategy>
- sampling_strategy_ : std::unique_ptr<SamplingStrategy>
- beer_selection_strategy_ : std::unique_ptr<BeerSelectionStrategy>
- checkin_strategy_ : std::unique_ptr<CheckinDistributionStrategy>
- follow_strategy_ : std::unique_ptr<FollowGenerationStrategy>
- beer_style_palette_ : std::vector<BeerStyle>
- options_ : ApplicationOptions
--
- user_pool_ : std::vector<GeneratedUser>
- brewery_pool_ : std::vector<GeneratedBrewery>
- beer_pool_ : std::vector<GeneratedBeer>
- checkin_pool_ : std::vector<GeneratedCheckin>
- follow_pool_ : std::vector<GeneratedFollow>
--
+ Run() : bool
- RunUserPhase(locations : const std::vector<Location>&) : void
- RunBreweryAndBeerPhase(locations : const std::vector<Location>&) : void
- RunCheckinPhase() : void
- RunRatingPhase() : void
- RunFollowPhase() : void
}
' --- Orchestration Aggregations (Services & Strategies) ---
BiergartenPipelineOrchestrator *-- DataPreloader
BiergartenPipelineOrchestrator *-- EnrichmentService
BiergartenPipelineOrchestrator *-- DataGenerator
BiergartenPipelineOrchestrator *-- ExportService
BiergartenPipelineOrchestrator *-- CheckinDistributionStrategy
BiergartenPipelineOrchestrator *-- FollowGenerationStrategy
BiergartenPipelineOrchestrator *-- SamplingStrategy
BiergartenPipelineOrchestrator *-- BeerSelectionStrategy
BiergartenPipelineOrchestrator *-- ApplicationOptions
BiergartenPipelineOrchestrator *-- Logger
' --- Orchestration Aggregations (Data Pools) ---
BiergartenPipelineOrchestrator *-- "0..*" GeneratedUser : user_pool_
BiergartenPipelineOrchestrator *-- "0..*" GeneratedBrewery : brewery_pool_
BiergartenPipelineOrchestrator *-- "0..*" GeneratedBeer : beer_pool_
BiergartenPipelineOrchestrator *-- "0..*" GeneratedCheckin : checkin_pool_
BiergartenPipelineOrchestrator *-- "0..*" GeneratedFollow : follow_pool_
' --- Interfaces & Implementations ---
DataPreloader <|.. JsonLoader
Logger <|.. PipelineLogger
ContextStrategy <|.. BreweryContextStrategy
ContextStrategy <|.. BeerContextStrategy
SamplingStrategy <|.. UniformSamplingStrategy
BeerSelectionStrategy <|.. RandomBeerSelectionStrategy
CheckinDistributionStrategy <|.. JCurveCheckinStrategy
CheckinDistributionStrategy <|.. RandomCheckinStrategy
FollowGenerationStrategy <|.. RandomFollowStrategy
FollowGenerationStrategy <|.. ActivityWeightedFollowStrategy
EnrichmentService <|.. WikipediaService
WebClient <|.. CURLWebClient
DataGenerator <|.. MockGenerator
DataGenerator <|.. LlamaGenerator
PromptFormatter <|.. Gemma4JinjaPromptFormatter
ExportService <|.. SqliteExportService
DateTimeProvider <|.. SystemDateTimeProvider
' --- Service Compositions & Dependencies ---
WikipediaService *-- WebClient
WikipediaService ..> ContextStrategy
LlamaGenerator *-- PromptFormatter
LlamaGenerator ..> GeneratorOptions
SqliteExportService *-- DateTimeProvider
' --- Cross-Component Aggregations (Held References) ---
PipelineLogger o-- BoundedChannel : logs to
LogWorker o-- BoundedChannel : drains from
' --- Domain Containment ---
EnrichedCity *-- Location
EnrichedCity *-- LocationContext
GeneratedBrewery *-- Location
GeneratedBrewery *-- BreweryResult
GeneratedBrewery *-- GenerationMetadata
GeneratedBeer *-- Location
GeneratedBeer *-- BeerStyle
GeneratedBeer *-- BeerResult
GeneratedBeer *-- GenerationMetadata
GeneratedUser *-- Location
GeneratedUser *-- UserResult
GeneratedUser *-- GenerationMetadata
GeneratedCheckin *-- CheckinResult
GeneratedCheckin *-- GenerationMetadata
GeneratedRating *-- RatingResult
GeneratedRating *-- GenerationMetadata
GeneratedFollow *-- GenerationMetadata
@enduml

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long