Package Architecture¶
dbt-nexus follows a layered architecture designed for scalability, maintainability, and source-agnostic data processing. This document explains the core architectural concepts and data flow.
Architectural Overview¶
The package is organized into five primary layers that transform raw data into resolved, production-ready entities:
graph TD
A[Raw Data Layer] --> B[Source Event Log Layer]
B --> C[Core Event Log Layer]
C --> D[Identity Resolution Layer]
D --> E[Final Tables Layer]
style A fill:#dae8fc
style B fill:#ffe6cc
style C fill:#f8cecc
style D fill:#e1d5e7
style E fill:#d5e8d4
Layer 1: Raw Data¶
- Purpose: Direct ingestion from source systems with minimal transformation
- Characteristics:
- Preserves original data structure
- Applies incremental filtering based on
synced_at
- Adds
_ingested_at
tracking - Isolates downstream models from source changes
Layer 2: Source Event Log¶
- Purpose: Normalize source-specific events and identifiers
- Characteristics:
- Translates raw data into standardized format
- Preserves source-specific details
- Follows consistent naming patterns
- Maintains source attribution
Layer 3: Core Event Log¶
- Purpose: Unified events with standardized schema across sources
- Characteristics:
- Unions data from multiple sources
- Standardized core fields only
- Source-agnostic processing
- Enables cross-source analysis
Layer 4: Identity Resolution¶
- Purpose: Resolve identities and deduplicate entities across sources
- Characteristics:
- Graph-based identity resolution
- Recursive CTE algorithms
- Cross-source entity matching
- Maintains source provenance
Layer 5: Final Tables¶
- Purpose: Production-ready tables for application use
- Characteristics:
- Clean, resolved entities
- Latest attribute values
- Optimized for queries
- Ready for operational use
Database Schema¶
The following diagram illustrates the complete database schema and relationships between entities:
This diagram shows the data flow from raw sources through identity resolution to final tables, including all entity relationships and key fields.
Core Entities¶
Events¶
The central entity that captures "what happened" across all sources:
-- Core event structure
{
id: UUID,
occurred_at: TIMESTAMP,
event_name: STRING,
event_description: STRING,
event_type: STRING,
source: STRING,
value: NUMERIC (optional),
value_unit: STRING (optional)
}
Persons¶
Individual entities with identifiers and traits:
-- Entity structure (v0.3.0)
{
entity_id: UUID,
entity_type: STRING, -- 'person', 'group', or custom types
identifiers: {
email: STRING,
phone: STRING,
user_id: STRING
},
traits: {
name: STRING,
timezone: STRING,
title: STRING
}
}
Groups¶
Organizational entities (companies, accounts):
-- Group structure
{
group_id: UUID,
identifiers: {
domain: STRING,
company_id: STRING,
shopify_id: STRING
},
traits: {
name: STRING,
industry: STRING,
size: STRING
}
}
Memberships¶
Relationships between persons and groups:
-- Relationship structure (v0.3.0)
{
relationship_id: UUID,
entity_a_id: UUID,
entity_a_type: STRING, -- 'person', 'group', etc.
entity_b_id: UUID,
entity_b_type: STRING, -- 'person', 'group', etc.
relationship_type: STRING, -- 'membership', 'collaboration', etc.
entity_a_role: STRING,
entity_b_role: STRING,
occurred_at: TIMESTAMP,
source: STRING
}
Source-Agnostic Design¶
Adapter Pattern¶
dbt-nexus uses an adapter pattern that allows any data source to be integrated by implementing a standard interface:
# Source configuration
vars:
nexus:
sources:
shopify_partner:
enabled: true
events: true
entities: ["group"]
relationships: false
gmail:
enabled: true
events: true
entities: ["person"]
relationships: false
Interface Requirements¶
Each source must provide models following these specifications:
Events Model Interface¶
-- Required fields for source events
SELECT
id, -- Unique event identifier
occurred_at, -- Event timestamp
event_name, -- Specific event name
event_description, -- Human-readable description
event_type, -- Event category
source, -- Source system name
value, -- Optional numeric value
value_unit, -- Optional value unit
_ingested_at -- Processing timestamp
FROM source_data
Identifiers Model Interface¶
-- Required fields for identifiers
SELECT
id, -- Unique identifier record ID
event_id, -- Reference to source event
edge_id, -- Groups related identifiers
identifier_type, -- Type (email, phone, domain)
identifier_value, -- Actual identifier value
source, -- Source system
occurred_at, -- When collected
_ingested_at -- When processed
FROM source_data
Traits Model Interface¶
-- Required fields for traits
SELECT
id, -- Unique trait record ID
event_id, -- Reference to source event
edge_id, -- Groups related traits
trait_name, -- Trait name
trait_value, -- Trait value
source, -- Source system
occurred_at, -- When collected
_ingested_at -- When processed
FROM source_data
Identity Resolution Process¶
Graph-Based Resolution¶
The identity resolution system uses graph algorithms to connect related identifiers:
- Edge Generation: Create connections between identifiers
- Edge Normalization: Eliminate directional duplicates (A→B = B→A)
- Connected Components: Find groups of connected identifiers
- Entity Assignment: Assign unique IDs to each connected component
Recursive CTE Implementation¶
WITH RECURSIVE identity_graph AS (
-- Base case: direct identifier relationships
SELECT
identifier_a,
identifier_b,
1 as depth
FROM edges
UNION ALL
-- Recursive case: transitive relationships
SELECT
ig.identifier_a,
e.identifier_b,
ig.depth + 1
FROM identity_graph ig
JOIN edges e ON ig.identifier_b = e.identifier_a
WHERE ig.depth < {{ var('nexus', {}).get('max_recursion', 5) }}
)
Deduplication Strategy¶
The system handles various deduplication challenges:
- Order Independence: A→B treated same as B→A
- Temporal Awareness: Preserves earliest occurrence
- Source Attribution: Maintains provenance
- Incremental Processing: Only processes new data
Performance Optimization¶
Incremental Processing¶
All models support incremental materialization:
{{ config(materialized='incremental', unique_key='id') }}
{% if is_incremental() %}
WHERE _ingested_at > (SELECT MAX(_ingested_at) FROM {{ this }})
{% endif %}
Per-Source Watermarking¶
When unioning multiple sources, the system applies per-source filtering:
-- Per-source watermark filtering prevents missing late-arriving data
{{ union_with_watermarks(sources_var='event_sources') }}
Materialization Strategy¶
- Development: Views for fast iteration
- Production: Tables for performance
- Large Tables: Incremental for efficiency
Cross-Database Compatibility¶
Database-Agnostic SQL¶
The package uses dbt's adapter pattern for cross-database compatibility:
{% macro generate_surrogate_key(field_list) %}
{{ return(adapter.dispatch('generate_surrogate_key', 'nexus')(field_list)) }}
{% endmacro %}
{% macro default__generate_surrogate_key(field_list) %}
-- Default implementation
{{ dbt_utils.surrogate_key(field_list) }}
{% endmacro %}
{% macro snowflake__generate_surrogate_key(field_list) %}
-- Snowflake-specific implementation
{{ dbt_utils.surrogate_key(field_list) }}
{% endmacro %}
Supported Warehouses¶
- Snowflake
- BigQuery
- Postgres
- Redshift
- Databricks
Configuration Management¶
Variable-Driven Architecture¶
The package uses dbt variables for flexible configuration:
vars:
nexus:
max_recursion: 5 # Identity resolution depth limit
entity_types: ["person", "group"]
sources:
gmail:
enabled: true
events: true
entities: ["person", "group"]
relationships: true
override_incremental: false # Force full refresh in development
Schema Organization¶
Models are organized into logical schemas:
models:
nexus:
event_log:
+schema: event_log
identity_resolution:
+schema: identity_resolution
final_tables:
+schema: marts
Extensibility¶
Adding New Sources¶
- Create source adapter models following interface requirements
- Add source configuration to
sources
variable - Run
dbt deps
anddbt run
- Core models automatically include new source
Custom Entity Types¶
The architecture supports custom entity types beyond persons/groups:
-- Example: Location entities
{% macro process_location_identifiers(source_model) %}
{{ process_identifiers('location', source_model) }}
{% endmacro %}
Macro Extensibility¶
Custom transformations can be added through the macro system:
-- Custom normalization logic
{% macro normalize_phone_number(phone_field) %}
REGEXP_REPLACE({{ phone_field }}, r'[^\d]', '')
{% endmacro %}
Testing Strategy¶
Data Quality Tests¶
models:
- name: nexus_entities
tests:
- unique:
column_name: entity_id
- not_null:
column_name: entity_id
- not_null:
column_name: entity_type
columns:
- name: entity_type
tests:
- accepted_values:
values: ["person", "group"]
- name: email
tests:
- not_null
Integration Tests¶
The package includes comprehensive integration tests with sample data to validate:
- Identity resolution accuracy
- Cross-source data integration
- Performance characteristics
- Edge case handling
Related Documentation¶
- Identity Resolution Logic - Deep dive into resolution algorithms
- Data Flow - Detailed data processing flow
- Performance Considerations - Optimization strategies
- Getting Started - Implementation guide