Project Overview
ETL_C- delivers a configurable, high-performance pipeline for ingesting JSON data into PostgreSQL. It streamlines end-to-end ETL workflows by decoupling schema definitions and load order into JSON configuration files, while providing built-in performance metrics and multiple parsing strategies.
Why This Project Exists
- Automate bulk JSON→PostgreSQL ingestion without hand-crafted SQL
- Enable rapid schema mapping changes via declarative JSON configs
- Benchmark and compare JSON parsers for optimal throughput
Primary Goals
- Offer a single executable (
main.cpp
) that loads:- Extraction configuration (data sources, parsers, performance mode)
- Schema and relationship definitions
- Database connection and load order
- Create or truncate tables, enforce foreign-key relationships, and insert transformed data
- Report timing metrics for each ETL stage
When to Use
- Loading large or nested JSON datasets into PostgreSQL
- Iterating on schema designs without recompiling C++
- Evaluating SIMDJSON vs. nlohmann::json parsing performance
- Incorporating an out-of-the-box ETL tool into CI/CD pipelines
Key Features
- JSON→PostgreSQL ingestion with:
- nlohmann::json (default)
- SIMDJSON “performance mode” for ultra-fast parsing
- Declarative schema mapping:
- Define tables, columns, types, and relationships in JSON
- Control load order and optional table truncation
- Automatic DDL generation and relationship enforcement
- Detailed performance metrics per stage (extract, transform, load)
- Dependency management via vcpkg (PostgreSQL client, JSON libraries)
Licensing
Released under the MIT License—use, modify, and distribute with no warranties.
Getting Started
This guide shows how to compile and run your first ETL job in minutes. You’ll install dependencies via vcpkg, build with CMake, prepare PostgreSQL, and execute the sample pipeline using the bundled JSON data.
1. Prerequisites
- Git
- CMake ≥ 3.15
- A C++17 compiler (GCC, Clang or MSVC)
- PostgreSQL server
- vcpkg (with your project’s vcpkg-configuration.json in place)
2. Clone and Bootstrap vcpkg
# Clone the ETL project
git clone https://github.com/PerezO12/ETL_C-.git
cd ETL_C-
# Bootstrap vcpkg (if you haven't already)
git clone https://github.com/microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # or .\bootstrap-vcpkg.bat on Windows
cd ..
3. Install Dependencies via vcpkg
Create a vcpkg.json manifest in the project root (next to vcpkg-configuration.json):
{
"name": "etl-project",
"version-string": "1.0.0",
"dependencies": [
"nlohmann-json",
"postgresql",
"simdjson"
]
}
Then install:
./vcpkg/vcpkg install --triplet x64-linux
# or
./vcpkg/vcpkg install --triplet x64-windows
4. Configure and Build with CMake
mkdir build && cd build
cmake .. \
-DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake \
-DCMAKE_BUILD_TYPE=Release
cmake --build . --config Release
This produces the etl_processor
executable.
5. Prepare PostgreSQL
Start your PostgreSQL server.
Create a database and user:
CREATE USER etl_user WITH PASSWORD 'etl_pass'; CREATE DATABASE etl_db OWNER etl_user;
Update
config/config.json
under thedatabase
section to match your credentials:{ "database": { "host": "localhost", "port": 5432, "dbname": "etl_db", "user": "etl_user", "password": "etl_pass" }, ... }
6. Execute the Sample Pipeline
Run the ETL processor pointing at the configuration and one of the sample data files:
# Using datos.json
./etl_processor ../config/config.json ../data/datos.json
# Or using datos2.json
./etl_processor ../config/config.json ../data/datos2.json
Upon completion you’ll see timing metrics for JSON extraction (nlohmann/json vs. simdjson) and confirmation of table creation and row inserts.
7. Verify Results
Connect with psql
or any SQL client:
psql -h localhost -U etl_user -d etl_db
Run a simple query:
SELECT COUNT(*) FROM estudiantes;
SELECT * FROM asignaturas LIMIT 5;
You should see the transformed records loaded into your tables.
Next Steps
- Explore
config/config.json
to adjust table mappings, relationships and transformations. - Extend
src/main.cpp
with custom extraction logic or new data sources. - Use
--truncate
or--no-relations
flags (see README) to control the load process for testing.
Configuration Reference
This section details every field in config/config.json
, guiding database setup, data loading, schema definitions, relationships, and ETL transformations.
database
Defines PostgreSQL connection parameters.
- host (string): Database server host
- port (string|number): TCP port (e.g. 5432)
- dbname (string): Name of the database
- user (string): Username for authentication
- password (string): Password for authentication
Example
"database": {
"host": "localhost",
"port": 5432,
"dbname": "university",
"user": "etl_user",
"password": "s3cret"
}
dataSources
Maps logical source names to input files or endpoints.
<sourceName>
(object)- path (string): File system path or URL
- format (string): Data format (
"json"
)
Example
"dataSources": {
"alumnos": {
"path": "data/alumnos.json",
"format": "json"
},
"profesores": {
"path": "data/profesores.json",
"format": "json"
}
}
globalOptions
Controls load behavior.
- loadOrder (array[string]): Sequence of table names to load (parents before children)
- truncateBeforeLoad (boolean): If
true
, issuesTRUNCATE … RESTART IDENTITY CASCADE
before inserts
Example
"globalOptions": {
"loadOrder": ["profesores","asignaturas","alumnos"],
"truncateBeforeLoad": true
}
tables
Defines table schemas and column mappings.
Each entry key is a table name; value is its schema object:
- generatedPK (boolean, default
true
): Addid SERIAL PRIMARY KEY
if missing - naturalKey (array[string], optional): Column(s) for lookup by natural key
- columns (array[object]): List of columns:
- name (string): Column name
- type (string): SQL type (e.g.
"TEXT"
,"INT"
) - unique (boolean, optional): Add
UNIQUE
constraint - references (object, optional):
- table (string): Referenced table
- column (string): Referenced column
Example
"tables": {
"profesores": {
"generatedPK": true,
"naturalKey": ["email"],
"columns": [
{ "name": "nombre", "type": "TEXT" },
{ "name": "email", "type": "TEXT", "unique": true }
]
},
"asignaturas": {
"columns": [
{ "name": "titulo", "type": "TEXT" },
{ "name": "profesor_id", "type": "INT", "references": { "table": "profesores", "column": "id" } }
]
}
}
relationships
Defines many‐to‐many junctions between tables.
- type (string): Must be
"MANY_TO_MANY"
- left (string): Left table name
- right (string): Right table name
- table (string): Junction table name
Example
"relationships": [
{
"type": "MANY_TO_MANY",
"left": "alumnos",
"right": "asignaturas",
"table": "alumnos_asignaturas"
}
]
transformations
Defines reusable string‐based ETL operations applied at runtime.
Each key is a transform name; value specifies:
- input (string): Expected input type (
"string"
) - output (string|array[string]): Output field(s) or type
- logic (string): Identifier for the built‐in logic branch in
applyTransformation
Example
"transformations": {
"split_fullname": {
"input": "string",
"output": ["nombre","apellido"],
"logic": "split_by_space"
},
"trim_uppercase": {
"input": "string",
"output": "string",
"logic": "trim && uppercase"
}
}
Accessing Configuration in Code
Load and navigate the configuration via ConfigLoader
:
#include "services/ConfigLoader.hpp"
using nlohmann::json;
int main() {
json cfg = ConfigLoader::load("config/config.json");
// Database config
auto db = cfg["database"];
std::string host = db["host"].get<std::string>();
// Load order
auto loadOrder = cfg["globalOptions"]["loadOrder"]
.get<std::vector<std::string>>();
// Table schemas
for (auto& [tableName, tblCfg] : cfg["tables"].items()) {
bool genPK = tblCfg.value("generatedPK", true);
// ...
}
// Transformations
json transforms = cfg["transformations"];
}
Use these fields with SchemaManager
, InsertValues
, and applyTransformation
to drive schema creation, data insertion, and ETL logic.
ETL Workflow & Core Concepts
This section outlines how data flows through the three ETL stages—Extraction, Transformation, Loading—using the core utilities, simdjson parsing, transformation rules, and batch database inserts. Each stage highlights extension points and performance tips.
1. Extraction
Extract raw JSON payloads and pull record arrays or scalar fields.
Key Functions
json extractData(const std::string& filePath)
Uses nlohmann::json to parse a file into ajson
object.simdjson::dom::element extractDataSimDjson(const std::string& filePath)
High-performance parsing for large files.std::vector<json> getJsonRecords(const json& data, const std::string& sourcePath, const std::string& rootPath = "")
Recursively drills into dot-delimited paths to return array elements.std::string getValueFromJson(const json& record, const std::string& jsonPath)
Retrieves a scalar field as a string; empty if missing.
Example: Extract Orders
#include "etl/Extraction.hpp"
#include <iostream>
int main() {
// Load JSON with simdjson for large file
auto dom = extractDataSimDjson("data/orders.json");
// Fallback to nlohmann::json for small config
json config = extractData("config/etl_config.json");
// Pull out an array at data.orders.items
auto items = getJsonRecords(
config, // using nlohmann::json here
"orders.items", // target array
"data" // nested under top-level "data"
);
for (auto& item : items) {
std::string id = getValueFromJson(item, "id");
std::string name = getValueFromJson(item, "name");
std::cout << "Item " << id << ": " << name << "\n";
}
}
Practical Tips
- Use
extractDataSimDjson
for multi-GB files; fallback toextractData
for configs. - Provide
rootPath
when your JSON nests payload under a fixed key. - Always check the returned vector before proceeding.
2. Transformation
Apply field-level mutations driven by a JSON configuration.
Key Functions
void applyTransformation(const std::string& value, const std::string& transformName, const json& transformsConfig)
Handles single string transforms (split_by_space
,trim_uppercase
).void applyTransformations(json& record, const json& transformsConfig)
Iterates over a record’s fields, applies mapped transformations.
Example: Normalize Records
#include "etl/Transformation.hpp"
#include "etl/Extraction.hpp"
#include <iostream>
int main() {
// Load raw records
json raw = extractData("data/users.json");
auto users = getJsonRecords(raw, "users");
// Transformation rules
json transforms = extractData("config/transforms.json");
// transforms = {
// { "username_trim", { {"logic","trim_uppercase"} } },
// { "fullname_split", { {"logic","split_by_space"} } }
// };
for (auto& user : users) {
// Mutates in place
applyTransformations(user, transforms);
std::cout << user.dump() << "\n";
}
}
Practical Tips
- Map field names to transform names in your ETL config.
- Extend by adding
else if (logic=="your_logic")
inTransformation.cpp
and update JSON.
3. Loading
Insert transformed records into PostgreSQL with batch operations and relationship handling.
Key Classes & Methods
QueryExecutor
Wraps aPGconn*
, exposesexecuteQuery
,lookupId
,selectQuery
.InsertValues
batchInsert(const std::string& table, const std::vector<json>& records, const json& tableConfig)
Performs parameterized batchINSERT
inside a transaction.processRelationships(const json& relConfig)
Establishes many-to-many links after inserts.
Example: Full ETL Pipeline
#include "etl/Extraction.hpp"
#include "etl/Transformation.hpp"
#include "database/QueryExecutor.hpp"
#include "database/InsertValues.hpp"
#include <libpq-fe.h>
#include <iostream>
int main() {
// 1. DB setup
PGconn* conn = PQconnectdb("dbname=mydb user=me");
QueryExecutor executor(conn);
InsertValues loader(executor);
// 2. Extraction
json raw = extractData("data/products.json");
auto records = getJsonRecords(raw, "products");
// 3. Transformation
json transforms = extractData("config/product_transforms.json");
for (auto& rec : records) {
applyTransformations(rec, transforms);
}
// 4. Loading
json tableConfig = extractData("config/product_table.json");
// tableConfig example:
// { "batchSize": 500,
// "columns": [ { "name":"id","jsonPath":"$.id","type":"int" },
// { "name":"data","jsonPath":"$.info","type":"jsonb" } ] }
try {
loader.batchInsert("products", records, tableConfig);
// Optional: process many-to-many relationships
json relConfig = extractData("config/product_category_rel.json");
loader.processRelationships(relConfig);
std::cout << "ETL completed successfully\n";
} catch (std::exception& e) {
std::cerr << "ETL failed: " << e.what() << "\n";
return 1;
}
return 0;
}
Practical Tips
- Cache foreign-key lookups (via
lookupId
) in your own map to reduce queries. - Tune
batchSize
for optimal memory vs. network overhead. - Wrap the entire load in transactions; on exception,
batchInsert
auto-rolls back.
Extension & Optimization
- Custom Extraction: Extend
Extraction.cpp
to support XML or CSV parsing alongside JSON. - New Transforms: Add logic blocks in
Transformation.cpp
and expose in your JSON spec. - Parallel Loading: Partition records by shard and run concurrent
batchInsert
calls on separate connections. - Utility Helpers: Leverage
Utility.hpp
for string joins, splits, and key–value map creation during transformation or lookup phases.
This workflow provides a modular ETL scaffold—swap in new parsers, enrich transformation logic, and scale loading by tuning batch sizes and caching. Use these core concepts to build robust, high-throughput pipelines.
Development & Contribution Guide
This guide helps contributors set up the project, follow coding standards, extend transformation logic, write tests, and submit pull requests.
1. Prerequisites & Setup
Clone the repo
git clone https://github.com/PerezO12/ETL_C-.git
cd ETL_C-Bootstrap vcpkg and install dependencies
./vcpkg/bootstrap-vcpkg.sh ./vcpkg/vcpkg install nlohmann-json simdjson libpqxx libpq
Configure and build with CMake
mkdir build && cd build cmake .. \ -DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake \ -DCMAKE_BUILD_TYPE=Release cmake --build . --parallel
Verify executables
./bin/etl_processor --help
2. Project Layout
├── CMakeLists.txt # Top-level build config
├── vcpkg-configuration.json
├── include/ # Public headers
│ └── etl/
├── src/ # Core implementation
│ ├── Transformation.cpp
│ └── …
├── tests/ # Unit tests
│ └── TransformationTests.cpp
└── tools/ # Scripts, CI configs
3. Coding Standards
- Language: C++17
- Formatting:
• 2-space indent
•snake_case
for files and functions
•PascalCase
for types/classes - Headers: include guards or
#pragma once
- Error Handling: use exceptions (e.g.,
std::runtime_error
) - Dependencies: reference via vcpkg; include
<nlohmann/json.hpp>
,<simdjson.h>
,<pqxx/pqxx>
- Commits: follow Conventional Commits
4. Extending Transformation Functions
All transformations live in src/Transformation.cpp
and are driven by JSON configs.
Example: Add a reverse_string
transform
In
Transformation.cpp
, updateapplyTransformation
:if (logic == "reverse_string") { std::string out = value; std::reverse(out.begin(), out.end()); return out; } // existing branches...
Update JSON config (
transformations.json
):{ "reverse_string": { "logic": "reverse_string" } }
Reference it in field-ops map:
{ "username": ["trim_uppercase", "reverse_string"] }
Apply in code:
json transformDefs = /* load from transformations.json */; json fieldOps = /* load field-ops config */; applyTransformations(record, transformDefs); applyTransformations(record, fieldOps);
5. Writing Tests
We use Catch2 (installed via vcpkg).
Install Catch2:
./vcpkg/vcpkg install catch2
Add test target in top‐level
CMakeLists.txt
:enable_testing() find_package(Catch2 CONFIG REQUIRED) add_executable(unit_tests tests/TransformationTests.cpp ) target_link_libraries(unit_tests PRIVATE etl_core Catch2::Catch2) add_test(NAME TransformationTests COMMAND unit_tests)
Example test (
tests/TransformationTests.cpp
):#define CATCH_CONFIG_MAIN #include <catch2/catch.hpp> #include "Transformation.hpp" using nlohmann::json; TEST_CASE("trim_uppercase works", "[transform]") { REQUIRE( applyTransformation(" abc ", "trim_uppercase", json{{"logic","trim_uppercase"}}) == "ABC" ); }
Run tests:
cd build ctest --output-on-failure
6. Submitting Pull Requests
Fork the repo and create a feature branch:
git checkout -b feat/add-reverse-transform
Implement code, add tests, update docs.
Run full build and tests locally.
Commit with a clear message:
feat(transform): add reverse_string transformation
Push and open a PR against
main
• Reference any related issue
• Ensure CI (build + ctest) passes
• Request reviews from maintainer team
Thank you for contributing!