This commit is contained in:
Aaron Po
2026-04-01 21:35:02 -04:00
parent 35aa7bc0df
commit 248a51b35f
6 changed files with 82 additions and 71 deletions

View File

@@ -39,8 +39,14 @@ public:
/// @brief Closes the SQLite connection if initialized.
~SqliteDatabase();
/// @brief Opens the in-memory database and creates schema objects.
void Initialize();
/// @brief Opens the SQLite database at dbPath and creates schema objects.
void Initialize(const std::string &dbPath = ":memory:");
/// @brief Starts a database transaction for batched writes.
void BeginTransaction();
/// @brief Commits the active database transaction.
void CommitTransaction();
/// @brief Inserts a country row.
void InsertCountry(int id, const std::string &name, const std::string &iso2,

View File

@@ -1,10 +1,10 @@
#include "data_downloader.h"
#include <cstdio>
#include <curl/curl.h>
#include <filesystem>
#include <fstream>
#include <spdlog/spdlog.h>
#include <sstream>
#include <sys/stat.h>
static size_t WriteCallback(void *contents, size_t size, size_t nmemb,
void *userp) {
@@ -19,8 +19,7 @@ DataDownloader::DataDownloader() {}
DataDownloader::~DataDownloader() {}
bool DataDownloader::FileExists(const std::string &filePath) const {
struct stat buffer;
return (stat(filePath.c_str(), &buffer) == 0);
return std::filesystem::exists(filePath);
}
std::string

View File

@@ -48,15 +48,36 @@ SqliteDatabase::~SqliteDatabase() {
}
}
void SqliteDatabase::Initialize() {
int rc = sqlite3_open(":memory:", &db);
void SqliteDatabase::Initialize(const std::string &dbPath) {
int rc = sqlite3_open(dbPath.c_str(), &db);
if (rc) {
throw std::runtime_error("Failed to create in-memory SQLite database");
throw std::runtime_error("Failed to open SQLite database: " + dbPath);
}
spdlog::info("OK: In-memory SQLite database created");
spdlog::info("OK: SQLite database opened: {}", dbPath);
InitializeSchema();
}
void SqliteDatabase::BeginTransaction() {
std::lock_guard<std::mutex> lock(dbMutex);
char *err = nullptr;
if (sqlite3_exec(db, "BEGIN TRANSACTION", nullptr, nullptr, &err) !=
SQLITE_OK) {
std::string msg = err ? err : "unknown";
sqlite3_free(err);
throw std::runtime_error("BeginTransaction failed: " + msg);
}
}
void SqliteDatabase::CommitTransaction() {
std::lock_guard<std::mutex> lock(dbMutex);
char *err = nullptr;
if (sqlite3_exec(db, "COMMIT", nullptr, nullptr, &err) != SQLITE_OK) {
std::string msg = err ? err : "unknown";
sqlite3_free(err);
throw std::runtime_error("CommitTransaction failed: " + msg);
}
}
void SqliteDatabase::InsertCountry(int id, const std::string &name,
const std::string &iso2,
const std::string &iso3) {

View File

@@ -1,65 +1,32 @@
#include "json_loader.h"
#include "stream_parser.h"
#include "work_queue.h"
#include <atomic>
#include <chrono>
#include <spdlog/spdlog.h>
#include <thread>
#include <vector>
void JsonLoader::LoadWorldCities(const std::string &jsonPath,
SqliteDatabase &db) {
auto startTime = std::chrono::high_resolution_clock::now();
spdlog::info("\nLoading {} (streaming RapidJSON SAX + producer-consumer)...",
jsonPath);
spdlog::info("\nLoading {} (streaming RapidJSON SAX)...", jsonPath);
const unsigned int QUEUE_CAPACITY = 1000;
WorkQueue<CityRecord> queue(QUEUE_CAPACITY);
db.BeginTransaction();
spdlog::info("Creating worker thread pool...");
unsigned int numWorkers = std::thread::hardware_concurrency();
if (numWorkers == 0)
numWorkers = 4; // Fallback if unavailable
spdlog::info(" Spawning {} worker threads", numWorkers);
std::vector<std::thread> workers;
std::atomic<unsigned long> citiesProcessed{0};
for (unsigned int i = 0; i < numWorkers; ++i) {
workers.push_back(std::thread([&]() {
unsigned long localCount = 0;
while (auto record = queue.pop()) {
db.InsertCity(record->id, record->state_id, record->country_id,
record->name, record->latitude, record->longitude);
localCount++;
}
citiesProcessed += localCount;
}));
}
spdlog::info("Streaming cities into worker queue...");
unsigned long totalCities = 0;
size_t citiesProcessed = 0;
StreamingJsonParser::Parse(
jsonPath, db, [&](const CityRecord &record) { queue.push(record); },
jsonPath, db,
[&](const CityRecord &record) {
db.InsertCity(record.id, record.state_id, record.country_id,
record.name, record.latitude, record.longitude);
citiesProcessed++;
},
[&](size_t current, size_t total) {
if (current % 10000 == 0 && current > 0) {
spdlog::info(" [Progress] Parsed {} cities...", current);
}
totalCities = current;
});
spdlog::info(" OK: Parsed all cities from JSON");
queue.shutdown_queue();
spdlog::info("Waiting for worker threads to complete...");
for (auto &worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
db.CommitTransaction();
auto endTime = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -74,7 +41,5 @@ void JsonLoader::LoadWorldCities(const std::string &jsonPath,
static_cast<long long>(duration.count())
: 0LL;
spdlog::info("Throughput: {} cities/sec", throughput);
spdlog::info("Worker pool: {} threads", numWorkers);
spdlog::info("Queue capacity: {}", QUEUE_CAPACITY);
spdlog::info("=======================================\n");
}

View File

@@ -3,8 +3,13 @@
#include "generator.h"
#include "json_loader.h"
#include <curl/curl.h>
#include <filesystem>
#include <spdlog/spdlog.h>
static bool FileExists(const std::string &filePath) {
return std::filesystem::exists(filePath);
}
int main(int argc, char *argv[]) {
try {
curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -15,17 +20,25 @@ int main(int argc, char *argv[]) {
argc > 3 ? argv[3] : "c5eb7772"; // Default: stable 2026-03-28
std::string jsonPath = cacheDir + "/countries+states+cities.json";
std::string dbPath = cacheDir + "/biergarten-pipeline.db";
spdlog::info("\n[Pipeline] Downloading geographic data from GitHub...");
DataDownloader downloader;
downloader.DownloadCountriesDatabase(jsonPath, commit);
bool hasJsonCache = FileExists(jsonPath);
bool hasDbCache = FileExists(dbPath);
SqliteDatabase db;
spdlog::info("Initializing in-memory SQLite database...");
db.Initialize();
spdlog::info("Initializing SQLite database at {}...", dbPath);
db.Initialize(dbPath);
JsonLoader::LoadWorldCities(jsonPath, db);
if (hasDbCache && hasJsonCache) {
spdlog::info("[Pipeline] Cache hit: skipping download and parse");
} else {
spdlog::info("\n[Pipeline] Downloading geographic data from GitHub...");
DataDownloader downloader;
downloader.DownloadCountriesDatabase(jsonPath, commit);
JsonLoader::LoadWorldCities(jsonPath, db);
}
spdlog::info("Initializing brewery generator...");
LlamaBreweryGenerator generator;

View File

@@ -1,10 +1,10 @@
#include "stream_parser.h"
#include "database.h"
#include <fstream>
#include <cstdio>
#include <rapidjson/filereadstream.h>
#include <rapidjson/reader.h>
#include <rapidjson/stringbuffer.h>
#include <spdlog/spdlog.h>
#include <sstream>
using namespace rapidjson;
@@ -196,32 +196,39 @@ void StreamingJsonParser::Parse(
spdlog::info(" Streaming parse of {}...", filePath);
std::ifstream file(filePath, std::ios::binary);
if (!file.is_open()) {
FILE *file = std::fopen(filePath.c_str(), "rb");
if (!file) {
throw std::runtime_error("Failed to open JSON file: " + filePath);
}
std::stringstream buffer;
buffer << file.rdbuf();
file.close();
std::string json_str = buffer.str();
size_t total_size = json_str.length();
size_t total_size = 0;
if (std::fseek(file, 0, SEEK_END) == 0) {
long file_size = std::ftell(file);
if (file_size > 0) {
total_size = static_cast<size_t>(file_size);
}
std::rewind(file);
}
CityRecordHandler::ParseContext ctx{&db, onCity, onProgress, 0,
total_size, 0, 0};
CityRecordHandler handler(ctx);
Reader reader;
StringStream ss(json_str.c_str());
char buf[65536];
FileReadStream frs(file, buf, sizeof(buf));
if (!reader.Parse(ss, handler)) {
if (!reader.Parse(frs, handler)) {
ParseErrorCode errCode = reader.GetParseErrorCode();
size_t errOffset = reader.GetErrorOffset();
std::fclose(file);
throw std::runtime_error(std::string("JSON parse error at offset ") +
std::to_string(errOffset) +
" (code: " + std::to_string(errCode) + ")");
}
std::fclose(file);
spdlog::info(" OK: Parsed {} countries, {} states, {} cities",
ctx.countries_inserted, ctx.states_inserted, ctx.cities_emitted);
}