Skip to content

Common Tasks

Setting Up a New Source

Step 1: Configure Source in dbt_project.yml

vars:
  nexus:
    sources:
      your_source_name:
        enabled: true
        events: true
        entities: ["person"]
        relationships: true

Step 2: Create Four-Layer Architecture

Base Layer - Raw Tables

models/sources/your_source/base/base_orders.sql:

select * from {{ source('your_source', 'orders') }}

models/sources/your_source/base/base_customers.sql:

select * from {{ source('your_source', 'customers') }}

Normalized Layer - Clean Business Entities

models/sources/your_source/normalized/your_source_orders.sql:

select
    o.order_id,
    o.customer_id,
    o.order_date,
    o.total_amount,
    c.customer_name,
    c.email,
    c.phone_number
from {{ ref('base_orders') }} o
left join {{ ref('base_customers') }} c
    on o.customer_id = c.customer_id

Intermediate Layer - Event-Specific Formatting

models/sources/your_source/intermediate/your_source_order_events.sql:

{{ config(materialized='table', tags=['event-processing']) }}

select
    {{ nexus.create_nexus_id('event', ['order_id', 'order_date']) }} as event_id,
    order_date as occurred_at,
    'order' as type,
    'order_placed' as event_name,
    'Order placed for ' || total_amount as event_description,
    'your_source' as source,

    -- Source-specific fields
    customer_id,
    order_id,
    total_amount,
    customer_name,
    email,
    phone_number

from {{ ref('your_source_orders') }}
where order_date is not null

models/sources/your_source/intermediate/your_source_order_person_identifiers.sql:

{{ config(materialized='table', tags=['identity-resolution']) }}

{{ nexus.unpivot_identifiers(
    model_name='your_source_order_events',
    event_id_field='event_id',
    edge_id_field='event_id',
    columns=['customer_id', 'email', 'phone_number'],
    additional_columns=['occurred_at', 'source'],
    column_to_identifier_type={
        'customer_id': 'customer_id',
        'email': 'email',
        'phone_number': 'phone'
    },
    role_column="'customer'"
) }}

Unioned Layer - Combined Models

models/sources/your_source/your_source_events.sql:

{{ config(materialized='table', tags=['event-processing']) }}

{{ dbt_utils.union_relations([
    ref('your_source_order_events'),
    ref('your_source_payment_events')
]) }}

order by occurred_at desc

Step 3: Test the Integration

dbt run --select source:your_source
dbt test --select source:your_source

Creating Custom States

Step 1: Create State Model

models/states/billing_lifecycle.sql:

select
    person_id,
    'trial' as state,
    trial_started_at as state_entered_at,
    trial_ended_at as state_exited_at,
    case when trial_ended_at is null then true else false end as is_current
from {{ ref('billing_events') }}
where trial_started_at is not null

union all

select
    person_id,
    'paid' as state,
    subscription_started_at as state_entered_at,
    subscription_ended_at as state_exited_at,
    case when subscription_ended_at is null then true else false end as is_current
from {{ ref('billing_events') }}
where subscription_started_at is not null

Step 2: Add to nexus_states Union

Update models/nexus-models/states/nexus_states.sql:

-- Add your new state model to the union
select * from {{ ref('billing_lifecycle') }}

Step 3: Test State Model

dbt run --select billing_lifecycle
dbt run --select nexus_states

Setting Up Alias Models

Step 1: Create Final Tables Directory

mkdir -p models/final-tables/links

Step 2: Create Alias Models

models/final-tables/persons.sql:

select * from {{ ref('nexus_persons') }}

models/final-tables/groups.sql:

select * from {{ ref('nexus_groups') }}

models/final-tables/events.sql:

select * from {{ ref('nexus_events') }}

models/final-tables/states.sql:

select * from {{ ref('nexus_states') }}

models/final-tables/links/memberships.sql:

select * from {{ ref('nexus_memberships') }}

models/final-tables/links/person_identifiers.sql:

select * from {{ ref('nexus_person_identifiers') }}

Configuring Schema Organization

Step 1: Update dbt_project.yml

models:
  your_project_name:
    final-tables:
      +schema: nexus_final_tables
    sources:
      +schema: nexus_sources
      +tags: ["nexus"]

  nexus:
    nexus-models:
      final-tables:
        +schema: nexus_final_tables
      states:
        +schema: nexus_final_tables
      identity-resolution:
        +schema: nexus_identity_resolution
      event-log:
        +schema: nexus_event_log
        nexus_events:
          +schema: nexus_final_tables

Step 2: Test Configuration

dbt run --select package:nexus
dbt docs generate

Running Demo Data

Step 1: Build Demo Data

# From your dbt project directory
dbt build

Step 2: Explore Demo Data

-- View all demo events
SELECT * FROM nexus_demo_data.nexus_events
ORDER BY occurred_at DESC;

-- View resolved persons
SELECT * FROM nexus_demo_data.nexus_persons;

-- View group memberships
SELECT
    p.name as person_name,
    g.name as group_name,
    m.role
FROM nexus_demo_data.nexus_memberships m
JOIN nexus_demo_data.nexus_persons p ON m.person_id = p.id
JOIN nexus_demo_data.nexus_groups g ON m.group_id = g.id;

Debugging Identity Resolution

Step 1: Check Edge Creation

-- View identity edges
SELECT * FROM nexus_identity_resolution.nexus_person_identifiers_edges
ORDER BY created_at DESC;

Step 2: Verify Recursion Settings

# Check your dbt_project.yml
vars:
  nexus_max_recursion: 5 # Adjust if needed

Step 3: Test with Smaller Dataset

# Run with specific source
dbt run --select source:your_source
dbt run --select package:nexus --models tag:identity-resolution

Performance Optimization

Step 1: Adjust Recursion Depth

vars:
  nexus_max_recursion: 3 # Reduce for better performance

Step 2: Configure Incremental Models

models:
  nexus:
    nexus-models:
      event-log:
        +materialized: incremental
        +incremental_strategy: merge

Step 3: Add Partitioning (BigQuery)

models:
  nexus:
    nexus-models:
      event-log:
        +partition_by:
          field: occurred_at
          data_type: timestamp
          granularity: day

Testing Your Implementation

Step 1: Run All Models

dbt run
dbt test

Step 2: Generate Documentation

dbt docs generate
dbt docs serve

Step 3: Verify Data Quality

-- Check for duplicate persons
SELECT
    email,
    COUNT(*) as count
FROM nexus_final_tables.nexus_persons
GROUP BY email
HAVING COUNT(*) > 1;

-- Check event timeline
SELECT
    MIN(occurred_at) as earliest_event,
    MAX(occurred_at) as latest_event,
    COUNT(*) as total_events
FROM nexus_final_tables.nexus_events;