diff --git a/pipeline/README.md b/pipeline/README.md index 3ef26d2..848cbac 100644 --- a/pipeline/README.md +++ b/pipeline/README.md @@ -1,217 +1,399 @@ -Biergarten Pipeline +# Biergarten Pipeline -Overview +A high-performance C++23 data pipeline for fetching, parsing, and storing geographic data (countries, states, cities) with brewery metadata generation capabilities. The system supports both mock and LLM-based (llama.cpp) generation modes. -The pipeline orchestrates five key stages: +## Overview -Download: Fetches countries+states+cities.json from a pinned GitHub commit with optional local caching. +The pipeline orchestrates **four key stages**: -Parse: Streams JSON using Boost.JSON's basic_parser to extract country/state/city records without loading the entire file into memory. +1. **Download** - Fetches `countries+states+cities.json` from a pinned GitHub commit with optional local filesystem caching +2. **Parse** - Streams JSON using Boost.JSON's `basic_parser` to extract country/state/city records without loading the entire file into memory +3. **Store** - Inserts records into a file-based SQLite database with all operations performed sequentially in a single thread +4. **Generate** - Produces brewery metadata or user profiles (mock implementation; supports future LLM integration via llama.cpp) -Buffer: Routes city records through a bounded concurrent queue to decouple parsing from writes. +## System Architecture -Store: Inserts records with concurrent thread safety using an in-memory SQLite database. +### Data Sources and Formats -Generate: Produces mock brewery metadata for a sample of cities (mockup for future LLM integration). +- **Hierarchical Structure**: Countries array → states per country → cities per state +- **Data Fields**: + - `id` (integer) + - `name` (string) + - `iso2` / `iso3` (ISO country/state codes) + - `latitude` / `longitude` (geographic coordinates) +- **Source**: [dr5hn/countries-states-cities-database](https://github.com/dr5hn/countries-states-cities-database) on GitHub +- **Output**: Structured SQLite file-based database (`biergarten-pipeline.db`) + structured logging via spdlog -Architecture +### Concurrency Model -Data Sources and Formats +The pipeline currently operates **single-threaded** with sequential stage execution: -Hierarchical structure: countries array → states per country → cities per state. +1. **Download Phase**: Main thread blocks while downloading the source JSON file (if not in cache) +2. **Parse & Store Phase**: Main thread performs streaming JSON parse with immediate SQLite inserts -Fields: id (integer), name (string), iso2 / iso3 (codes), latitude / longitude. +**Thread Safety**: While single-threaded, the `SqliteDatabase` component is **mutex-protected** using `std::mutex` (`dbMutex`) for all database operations. This design enables safe future parallelization without code modifications. -Sourced from: dr5hn/countries-states-cities-database on GitHub. +## Core Components -Output: Structured SQLite in-memory database + console logs via spdlog. +| Component | Purpose | Thread Safety | Dependencies | +| ----------------------------- | ----------------------------------------------------------------------------------------------- | -------------------------------------------- | --------------------------------------------- | +| **BiergartenDataGenerator** | Orchestrates pipeline execution; manages lifecycle of downloader, parser, and generator | Single-threaded coordinator | ApplicationOptions, WebClient, SqliteDatabase | +| **DataDownloader** | HTTP fetch with curl; optional filesystem cache; ETag support and retries | Blocking I/O; safe for startup | IWebClient, filesystem | +| **StreamingJsonParser** | Extends `boost::json::basic_parser`; emits country/state/city via callbacks; tracks parse depth | Single-threaded parse; callbacks thread-safe | Boost.JSON | +| **JsonLoader** | Wraps parser; dispatches callbacks for country/state/city; manages WorkQueue lifecycle | Produces to WorkQueue; safe callbacks | StreamingJsonParser, SqliteDatabase | +| **SqliteDatabase** | Manages schema initialization; insert/query methods for geographic data | Mutex-guarded all operations | SQLite3 | +| **IDataGenerator** (Abstract) | Interface for brewery/user metadata generation | Stateless virtual methods | N/A | +| **LlamaGenerator** | LLM-based generation via llama.cpp; configurable sampling (temperature, top-p, seed) | Manages llama_model* and llama_context* | llama.cpp, BreweryResult, UserResult | +| **MockGenerator** | Deterministic mock generation using seeded randomization | Stateless; thread-safe | N/A | +| **CURLWebClient** | HTTP client adapter; URL encoding; file downloads | cURL library bindings | libcurl | +| **WikipediaService** | (Planned) Wikipedia data lookups for enrichment | N/A | IWebClient | -Concurrency Architecture +## Database Schema -The pipeline splits work across parsing and writing phases: +SQLite file-based database with **three core tables** and **indexes for fast lookups**: -Main Thread: -parse_sax() -> Insert countries (direct) --> Insert states (direct) --> Push CityRecord to WorkQueue - -Worker Threads (implicit; pthread pool via sqlite3): -Pop CityRecord from WorkQueue --> InsertCity(db) with mutex protection - -Key synchronization primitives: - -WorkQueue: Bounded (default 1024 items) concurrent queue with blocking push/pop, guarded by mutex + condition variables. - -SqliteDatabase::dbMutex: Serializes all SQLite operations to avoid SQLITE_BUSY and ensure write safety. - -Backpressure: When the WorkQueue fills (≥1024 city records pending), the parser thread blocks until workers drain items. - -Component Responsibilities - -Component - -Purpose - -Thread Safety - -DataDownloader - -GitHub fetch with curl; optional filesystem cache; handles retries and ETags. - -Blocking I/O; safe for single-threaded startup. - -StreamingJsonParser - -Subclasses boost::json::basic_parser; emits country/state/city via callbacks; tracking parse depth. - -Single-threaded parse phase; thread-safe callbacks. - -JsonLoader - -Wraps parser; runs country/state/city callbacks; manages WorkQueue lifecycle. - -Produces to WorkQueue; consumes from callbacks. - -SqliteDatabase - -In-memory schema; insert/query methods; mutex-protected SQL operations. - -Mutex-guarded; thread-safe concurrent inserts. - -LlamaBreweryGenerator - -Mock brewery text generation using deterministic seed-based selection. - -Stateless; thread-safe method calls. - -Database Schema - -SQLite in-memory database with three core tables: - -Countries +### Countries +```sql CREATE TABLE countries ( -id INTEGER PRIMARY KEY, -name TEXT NOT NULL, -iso2 TEXT, -iso3 TEXT + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + iso2 TEXT, + iso3 TEXT ); CREATE INDEX idx_countries_iso2 ON countries(iso2); +``` -States +### States +```sql CREATE TABLE states ( -id INTEGER PRIMARY KEY, -country_id INTEGER NOT NULL, -name TEXT NOT NULL, -iso2 TEXT, -FOREIGN KEY (country_id) REFERENCES countries(id) + id INTEGER PRIMARY KEY, + country_id INTEGER NOT NULL, + name TEXT NOT NULL, + iso2 TEXT, + FOREIGN KEY (country_id) REFERENCES countries(id) ); CREATE INDEX idx_states_country ON states(country_id); +``` -Cities +### Cities +```sql CREATE TABLE cities ( -id INTEGER PRIMARY KEY, -state_id INTEGER NOT NULL, -country_id INTEGER NOT NULL, -name TEXT NOT NULL, -latitude REAL, -longitude REAL, -FOREIGN KEY (state_id) REFERENCES states(id), -FOREIGN KEY (country_id) REFERENCES countries(id) + id INTEGER PRIMARY KEY, + state_id INTEGER NOT NULL, + country_id INTEGER NOT NULL, + name TEXT NOT NULL, + latitude REAL, + longitude REAL, + FOREIGN KEY (state_id) REFERENCES states(id), + FOREIGN KEY (country_id) REFERENCES countries(id) ); CREATE INDEX idx_cities_state ON cities(state_id); CREATE INDEX idx_cities_country ON cities(country_id); +``` -Configuration and Extensibility +## Architecture Diagram -Command-Line Arguments +```plantuml +@startuml biergarten-pipeline +!theme plain +skinparam monochrome true +skinparam classBackgroundColor #FFFFFF +skinparam classBorderColor #000000 -Boost.Program_options provides named CLI arguments: +package "Application Layer" { + class BiergartenDataGenerator { + - options: ApplicationOptions + - webClient: IWebClient + - database: SqliteDatabase + - generator: IDataGenerator + -- + + Run() : int + } +} +package "Data Acquisition" { + class DataDownloader { + - webClient: IWebClient + -- + + Download(url: string, filePath: string) + + DownloadWithCache(url: string, cachePath: string) + } + + interface IWebClient { + + DownloadToFile(url: string, filePath: string) + + Get(url: string) : string + + UrlEncode(value: string) : string + } + + class CURLWebClient { + - globalState: CurlGlobalState + -- + + DownloadToFile(url: string, filePath: string) + + Get(url: string) : string + + UrlEncode(value: string) : string + } +} + +package "JSON Processing" { + class StreamingJsonParser { + - depth: int + -- + + on_object_begin() + + on_object_end() + + on_array_begin() + + on_array_end() + + on_key(str: string) + + on_string(str: string) + + on_number(value: int) + } + + class JsonLoader { + -- + + LoadWorldCities(jsonPath: string, db: SqliteDatabase) + } +} + +package "Data Storage" { + class SqliteDatabase { + - db: sqlite3* + - dbMutex: std::mutex + -- + + Initialize(dbPath: string) + + InsertCountry(id: int, name: string, iso2: string, iso3: string) + + InsertState(id: int, countryId: int, name: string, iso2: string) + + InsertCity(id: int, stateId: int, countryId: int, name: string, lat: double, lon: double) + + QueryCountries(limit: int) : vector + + QueryStates(limit: int) : vector + + QueryCities() : vector + + BeginTransaction() + + CommitTransaction() + # InitializeSchema() + } + + struct Country { + id: int + name: string + iso2: string + iso3: string + } + + struct State { + id: int + name: string + iso2: string + countryId: int + } + + struct City { + id: int + name: string + countryId: int + } +} + +package "Data Generation" { + interface IDataGenerator { + + load(modelPath: string) + + generateBrewery(cityName: string, countryName: string, regionContext: string) : BreweryResult + + generateUser(locale: string) : UserResult + } + + class LlamaGenerator { + - model: llama_model* + - context: llama_context* + - sampling_temperature: float + - sampling_top_p: float + - sampling_seed: uint32_t + -- + + load(modelPath: string) + + generateBrewery(...) : BreweryResult + + generateUser(locale: string) : UserResult + + setSamplingOptions(temperature: float, topP: float, seed: int) + # infer(prompt: string) : string + } + + class MockGenerator { + -- + + load(modelPath: string) + + generateBrewery(...) : BreweryResult + + generateUser(locale: string) : UserResult + } + + struct BreweryResult { + name: string + description: string + } + + struct UserResult { + username: string + bio: string + } +} + +package "Enrichment (Planned)" { + class WikipediaService { + - webClient: IWebClient + -- + + SearchCity(cityName: string, countryName: string) : string + } +} + +' Relationships +BiergartenDataGenerator --> DataDownloader +BiergartenDataGenerator --> JsonLoader +BiergartenDataGenerator --> SqliteDatabase +BiergartenDataGenerator --> IDataGenerator + +DataDownloader --> IWebClient +CURLWebClient ..|> IWebClient + +JsonLoader --> StreamingJsonParser +JsonLoader --> SqliteDatabase + +LlamaGenerator ..|> IDataGenerator +MockGenerator ..|> IDataGenerator + +SqliteDatabase --> Country +SqliteDatabase --> State +SqliteDatabase --> City + +LlamaGenerator --> BreweryResult +LlamaGenerator --> UserResult +MockGenerator --> BreweryResult +MockGenerator --> UserResult + +WikipediaService --> IWebClient + +@enduml +``` + +## Configuration and Extensibility + +### Command-Line Arguments + +Boost.Program_options provides named CLI arguments. Running without arguments displays usage instructions. + +```bash ./biergarten-pipeline [options] +``` -Arg +**Requirement**: Exactly one of `--mocked` or `--model` must be specified. -Default +| Argument | Short | Type | Purpose | +| --------------- | ----- | ------ | --------------------------------------------------------------- | +| `--mocked` | - | flag | Use mocked generator for brewery/user data | +| `--model` | `-m` | string | Path to LLM model file (gguf); mutually exclusive with --mocked | +| `--cache-dir` | `-c` | path | Directory for cached JSON (default: `/tmp`) | +| `--temperature` | - | float | LLM sampling temperature 0.0-1.0 (default: `0.8`) | +| `--top-p` | - | float | Nucleus sampling parameter 0.0-1.0 (default: `0.92`) | +| `--seed` | - | int | Random seed: -1 for random (default: `-1`) | +| `--help` | `-h` | flag | Show help message | -Purpose +**Note**: The data source is always pinned to commit `c5eb7772` (stable 2026-03-28) and cannot be changed. ---model, -m +**Note**: When `--mocked` is used, any sampling parameters (`--temperature`, `--top-p`, `--seed`) are ignored with a warning. -"" +### Usage Examples -Path to LLM model (mock implementation used if left blank). +```bash +# Mocked generator (deterministic, no LLM required) +./biergarten-pipeline --mocked ---cache-dir, -c - -/tmp - -Directory for cached JSON DB. - ---commit - -c5eb7772 - -Git commit hash for consistency (stable 2026-03-28 snapshot). - ---help, -h - -- - -Show help menu. - -Examples: - -./biergarten-pipeline +# With LLM model ./biergarten-pipeline --model ./models/llama.gguf --cache-dir /var/cache -./biergarten-pipeline -c /tmp --commit v1.2.3 -Building and Running +# Mocked with extra parameters provided (will be ignored with warning) +./biergarten-pipeline --mocked --temperature 0.5 --top-p 0.8 --seed 42 -Prerequisites +# Show help +./biergarten-pipeline --help +``` -C++23 compiler (g++, clang, MSVC). +## Building and Running -CMake 3.20+. +### Prerequisites -curl (for HTTP downloads). +- **C++23 compiler** (g++, clang, MSVC) +- **CMake** 3.20+ +- **curl** (for HTTP downloads) +- **sqlite3** (database backend) +- **Boost** 1.75+ (requires Boost.JSON and Boost.Program_options) +- **spdlog** v1.11.0 (fetched via CMake FetchContent) +- **llama.cpp** (fetched via CMake FetchContent for LLM inference) -sqlite3. - -Boost 1.75+ (requires Boost.JSON and Boost.Program_options). - -spdlog (fetched via CMake FetchContent). - -Build +### Build +```bash mkdir -p build cd build cmake .. cmake --build . --target biergarten-pipeline -- -j +``` -Run +### Run -./biergarten-pipeline +```bash +./build/biergarten-pipeline +``` -Output: Logs to console; caches JSON in /tmp/countries+states+cities.json. +**Output**: -Code Style and Static Analysis +- Console logs with structured spdlog output +- Cached JSON file: `/tmp/countries+states+cities.json` +- SQLite database: `biergarten-pipeline.db` (in output directory) -This project is configured to use: +## Code Quality and Static Analysis -- clang-format with the Google C++ style guide (via .clang-format) -- clang-tidy checks focused on Google, modernize, performance, and bug-prone rules (via .clang-tidy) +### Formatting -After configuring CMake, use: +This project uses **clang-format** with the **Google C++ style guide**: -cmake --build . --target format +```bash +# Apply formatting to all source files +cmake --build build --target format -to apply formatting, and: +# Check formatting without modifications +cmake --build build --target format-check +``` -cmake --build . --target format-check +### Static Analysis + +This project uses **clang-tidy** with configurations for Google, modernize, performance, and bug-prone rules (`.clang-tidy`): + +Static analysis runs automatically during compilation if `clang-tidy` is available. + +## Code Implementation Summary + +### Key Achievements + +✅ **Full pipeline implementation** - Download → Parse → Store → Generate +✅ **Streaming JSON parser** - Memory-efficient processing via Boost.JSON callbacks +✅ **Thread-safe SQLite wrapper** - Mutex-protected database for future parallelization +✅ **Flexible data generation** - Abstract IDataGenerator interface supporting both mock and LLM modes +✅ **Comprehensive CLI** - Boost.Program_options with sensible defaults +✅ **Production-grade logging** - spdlog integration for structured output +✅ **Build quality** - CMake with clang-format/clang-tidy integration + +### Architecture Patterns + +- **Interface-based design**: `IWebClient`, `IDataGenerator` abstract base classes enable substitution and testing +- **Dependency injection**: Components receive dependencies via constructors (BiergartenDataGenerator) +- **RAII principle**: SQLite connections and resources managed via destructors +- **Callback-driven parsing**: Boost.JSON parser emits events to processing callbacks +- **Transaction-scoped inserts**: BeginTransaction/CommitTransaction for batch performance + +### External Dependencies + +| Dependency | Version | Purpose | Type | +| ---------- | ------- | ---------------------------------- | ------- | +| Boost | 1.75+ | JSON parsing, CLI argument parsing | Library | +| SQLite3 | - | Persistent data storage | System | +| libcurl | - | HTTP downloads | System | +| spdlog | v1.11.0 | Structured logging | Fetched | +| llama.cpp | b8611 | LLM inference engine | Fetched | to validate formatting without modifying files. diff --git a/pipeline/includes/application_options.h b/pipeline/includes/application_options.h deleted file mode 100644 index 3f59c93..0000000 --- a/pipeline/includes/application_options.h +++ /dev/null @@ -1,2 +0,0 @@ -#pragma once - diff --git a/pipeline/includes/biergarten_data_generator.h b/pipeline/includes/biergarten_data_generator.h index b2c75d3..d13cef1 100644 --- a/pipeline/includes/biergarten_data_generator.h +++ b/pipeline/includes/biergarten_data_generator.h @@ -5,7 +5,6 @@ #include #include -#include "application_options.h" #include "data_generation/data_generator.h" #include "database/database.h" #include "web_client/web_client.h" @@ -16,9 +15,12 @@ * @brief Program options for the Biergarten pipeline application. */ struct ApplicationOptions { - /// @brief Path to the LLM model file (gguf format). + /// @brief Path to the LLM model file (gguf format); mutually exclusive with useMocked. std::string modelPath; + /// @brief Use mocked generator instead of LLM; mutually exclusive with modelPath. + bool useMocked = false; + /// @brief Directory for cached JSON and database files. std::string cacheDir; @@ -31,7 +33,7 @@ struct ApplicationOptions { /// @brief Random seed for sampling (-1 for random, otherwise non-negative). int seed = -1; - /// @brief Git commit hash for database consistency. + /// @brief Git commit hash for database consistency (always pinned to c5eb7772). std::string commit = "c5eb7772"; }; diff --git a/pipeline/src/main.cpp b/pipeline/src/main.cpp index 72ce570..a66bbe4 100644 --- a/pipeline/src/main.cpp +++ b/pipeline/src/main.cpp @@ -4,7 +4,6 @@ #include #include -#include "application_options.h" #include "biergarten_data_generator.h" #include "web_client/curl_web_client.h" #include "database/database.h" @@ -17,11 +16,30 @@ namespace po = boost::program_options; * @param argc Command-line argument count. * @param argv Command-line arguments. * @param options Output ApplicationOptions struct. - * @return true if parsing succeeded and help was not requested, false otherwise. + * @return true if parsing succeeded and should proceed, false otherwise. */ bool ParseArguments(int argc, char **argv, ApplicationOptions &options) { + // If no arguments provided, display usage and exit + if (argc == 1) { + std::cout << "Biergarten Pipeline - Geographic Data Pipeline with Brewery Generation\n\n"; + std::cout << "Usage: biergarten-pipeline [options]\n\n"; + std::cout << "Options:\n"; + std::cout << " --mocked Use mocked generator for brewery/user data\n"; + std::cout << " --model, -m PATH Path to LLM model file (gguf) for generation\n"; + std::cout << " --cache-dir, -c DIR Directory for cached JSON (default: /tmp)\n"; + std::cout << " --temperature TEMP LLM sampling temperature 0.0-1.0 (default: 0.8)\n"; + std::cout << " --top-p VALUE Nucleus sampling parameter 0.0-1.0 (default: 0.92)\n"; + std::cout << " --seed SEED Random seed: -1 for random (default: -1)\n"; + std::cout << " --help, -h Show this help message\n\n"; + std::cout << "Note: --mocked and --model are mutually exclusive. Exactly one must be provided.\n"; + std::cout << "Data source is always pinned to commit c5eb7772 (stable 2026-03-28).\n"; + return false; + } + po::options_description desc("Pipeline Options"); desc.add_options()("help,h", "Produce help message")( + "mocked", po::bool_switch(), + "Use mocked generator for brewery/user data")( "model,m", po::value()->default_value(""), "Path to LLM model (gguf)")( "cache-dir,c", po::value()->default_value("/tmp"), @@ -31,9 +49,7 @@ bool ParseArguments(int argc, char **argv, ApplicationOptions &options) { "top-p", po::value()->default_value(0.92f), "Nucleus sampling top-p in (0,1] (higher = more random)")( "seed", po::value()->default_value(-1), - "Sampler seed: -1 for random, otherwise non-negative integer")( - "commit", po::value()->default_value("c5eb7772"), - "Git commit hash for DB consistency"); + "Sampler seed: -1 for random, otherwise non-negative integer"); po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); @@ -44,12 +60,38 @@ bool ParseArguments(int argc, char **argv, ApplicationOptions &options) { return false; } - options.modelPath = vm["model"].as(); + // Check for mutually exclusive --mocked and --model flags + bool useMocked = vm["mocked"].as(); + std::string modelPath = vm["model"].as(); + + if (useMocked && !modelPath.empty()) { + spdlog::error("ERROR: --mocked and --model are mutually exclusive"); + return false; + } + + if (!useMocked && modelPath.empty()) { + spdlog::error("ERROR: Either --mocked or --model must be specified"); + return false; + } + + // Warn if sampling parameters are provided with --mocked + if (useMocked) { + bool hasTemperature = vm["temperature"].defaulted() == false; + bool hasTopP = vm["top-p"].defaulted() == false; + bool hasSeed = vm["seed"].defaulted() == false; + + if (hasTemperature || hasTopP || hasSeed) { + spdlog::warn("WARNING: Sampling parameters (--temperature, --top-p, --seed) are ignored when using --mocked"); + } + } + + options.useMocked = useMocked; + options.modelPath = modelPath; options.cacheDir = vm["cache-dir"].as(); options.temperature = vm["temperature"].as(); options.topP = vm["top-p"].as(); options.seed = vm["seed"].as(); - options.commit = vm["commit"].as(); + // commit is always pinned to c5eb7772 return true; }