LEVEL 07

DE Pipeline Patterns

SCD, MERGE, incremental loads, dedup, pivot, JSON. The SQL that powers dbt models and Airflow tasks.

SCD Type 1 — Overwrite (no history)

Just update the record. Old value is gone. Use when history doesn't matter — correcting typos, wrong phone numbers.

-- Oracle MERGE — SCD Type 1
MERGE INTO accounts tgt
USING accounts_staging src ON (tgt.account_id = src.account_id)
WHEN MATCHED THEN
    UPDATE SET tgt.city    = src.city,
               tgt.segment = src.segment,
               tgt.updated_at = SYSDATE
    WHERE tgt.city <> src.city OR tgt.segment <> src.segment
WHEN NOT MATCHED THEN
    INSERT (account_id, customer, city, segment, created_at)
    VALUES (src.account_id, src.customer, src.city, src.segment, SYSDATE);

SCD Type 2 — Full history

New row for every change. Old row gets an end date. Every serious warehouse uses this.

-- Table structure: one row per version of each account
account_id | city    | segment | effective_from | effective_to | is_current
ACC003     | Delhi   | RETAIL  | 2020-01-01     | 2022-05-31   | 0
ACC003     | Pune    | RETAIL  | 2022-06-01     | 2024-02-28   | 0
ACC003     | Mumbai  | HNI     | 2024-03-01     | 9999-12-31   | 1  ← current
-- Step 1: expire old rows for changed accounts
UPDATE accounts_dim tgt
SET    effective_to = TRUNC(SYSDATE) - 1, is_current = 0
WHERE  is_current = 1
AND EXISTS (
    SELECT 1 FROM accounts_staging src
    WHERE  src.account_id = tgt.account_id
    AND   (src.city <> tgt.city OR src.segment <> tgt.segment)
);

-- Step 2: insert new current rows
INSERT INTO accounts_dim (surrogate_key, account_id, city, segment,
                            effective_from, effective_to, is_current)
SELECT accounts_dim_seq.NEXTVAL,
       src.account_id, src.city, src.segment,
       TRUNC(SYSDATE), DATE '9999-12-31', 1
FROM   accounts_staging src
WHERE NOT EXISTS (
    SELECT 1 FROM accounts_dim tgt
    WHERE  tgt.account_id = src.account_id
    AND    tgt.is_current = 1
    AND    tgt.city = src.city AND tgt.segment = src.segment
);
-- Point-in-time query: what did account ACC003 look like on 2023-01-01?
SELECT account_id, city, segment
FROM   accounts_dim
WHERE  effective_from <= DATE '2023-01-01'
AND    effective_to   >  DATE '2023-01-01';

MERGE / UPSERT — cross-database

-- PostgreSQL: INSERT ... ON CONFLICT
INSERT INTO transactions_target
    (txn_id, account_id, amount, status)
SELECT txn_id, account_id, amount, status
FROM   transactions_staging
ON CONFLICT (txn_id)
DO UPDATE SET
    status     = EXCLUDED.status,
    amount     = EXCLUDED.amount,
    updated_at = CURRENT_TIMESTAMP
WHERE  transactions_target.status <> EXCLUDED.status;

-- Skip duplicates silently
ON CONFLICT (txn_id) DO NOTHING;

-- MySQL
INSERT INTO target (txn_id, amount, status)
SELECT txn_id, amount, status FROM staging
ON DUPLICATE KEY UPDATE
    status = VALUES(status), amount = VALUES(amount);

Incremental load — watermark pattern

-- Control table stores last loaded timestamp
pipeline_name | last_loaded_at      | rows_loaded | status
TXN_DAILY     | 2024-03-14 23:59:00 | 48291       | SUCCESS

-- Load only rows newer than the watermark
INSERT INTO transactions_warehouse
SELECT txn_id, account_id, txn_date, amount, status
FROM   transactions_source
WHERE  created_at > (SELECT last_loaded_at FROM pipeline_control
                      WHERE  pipeline_name = 'TXN_DAILY')
AND    created_at <= SYSDATE;      -- safety ceiling

-- Update watermark after successful load
UPDATE pipeline_control
SET    last_loaded_at = SYSDATE, rows_loaded = SQL%ROWCOUNT, status = 'SUCCESS'
WHERE  pipeline_name = 'TXN_DAILY';

Deduplication — the ROW_NUMBER pattern

-- Keep only the most recent row per txn_id (memorise this pattern)
WITH deduped AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY txn_id       -- what defines a "duplicate"
               ORDER BY     created_at DESC  -- which one to keep
           ) AS rn
    FROM   transactions_staging
)
INSERT INTO transactions_clean
SELECT txn_id, account_id, amount, status, branch, txn_date
FROM   deduped
WHERE  rn = 1;

Pivot & Unpivot

-- CASE WHEN pivot — works in all databases
SELECT branch,
       SUM(CASE WHEN TO_CHAR(txn_date,'MON')='JAN' THEN amount END) AS jan,
       SUM(CASE WHEN TO_CHAR(txn_date,'MON')='FEB' THEN amount END) AS feb,
       SUM(CASE WHEN TO_CHAR(txn_date,'MON')='MAR' THEN amount END) AS mar
FROM   transactions
WHERE  status = 'SUCCESS' AND EXTRACT(YEAR FROM txn_date) = 2024
GROUP BY branch;

-- Oracle native PIVOT
SELECT * FROM (
    SELECT branch, TO_CHAR(txn_date,'MON') AS mth, amount
    FROM   transactions WHERE status='SUCCESS'
) PIVOT (
    SUM(amount) FOR mth IN ('JAN' AS jan, 'FEB' AS feb, 'MAR' AS mar)
);

JSON parsing

-- Oracle: extract scalar fields
-- payload: {"customer":{"tier":"HNI"},"device":"mobile","risk_score":72}
SELECT txn_id,
       JSON_VALUE(payload, '$.customer.tier')              AS tier,
       JSON_VALUE(payload, '$.device')                     AS device,
       JSON_VALUE(payload, '$.risk_score' RETURNING NUMBER) AS risk_score
FROM   transactions_raw
WHERE  JSON_VALUE(payload, '$.risk_score' RETURNING NUMBER) > 70;

-- PostgreSQL JSONB operators
SELECT txn_id,
       payload ->> 'device'              AS device,       -- ->> returns TEXT
       payload -> 'customer' ->> 'tier'  AS tier,        -- -> returns JSON
       (payload ->> 'risk_score')::numeric AS risk_score
FROM   transactions_raw
WHERE  (payload ->> 'risk_score')::numeric > 70;

-- Explode JSON array into rows (PostgreSQL)
SELECT t.txn_id,
       prod ->> 'code'    AS product_code,
       prod ->> 'amount'  AS product_amount
FROM   transactions_raw t,
       jsonb_array_elements(t.payload -> 'products') AS prod;

Practice Problems

Q1 · Easy
Write a MERGE (Oracle) that loads accounts_staging into accounts_target using SCD Type 1. Match on account_id. Update city and segment only if they actually changed. Insert new accounts.
Q2 · Medium
An account's segment changed RETAIL → HNI. Write both SCD Type 2 steps: (1) expire the old row, (2) insert the new current row. Then write a query showing the full history of that account in date order.
Q3 · Medium
Design a complete incremental load pipeline: control table with watermark, load new rows, handle late arrivals (up to 3 days old), update watermark. What happens if the pipeline fails halfway? How do you make it idempotent?
Q4 · Hard
Staging has duplicates (same txn_id 2-3× from API retries). Some txn_ids already exist in target with different status. Write one SQL block: CTE to dedup staging (keep latest by created_at), then MERGE the deduped result into target.
Q5 · Hard
Complete daily pipeline as one SQL script with CTEs: dedup staging → identify new accounts → identify changed accounts (SCD2) → identify unchanged → INSERT new → UPDATE expire changed → INSERT new current rows → SELECT audit summary (how many new, changed, unchanged).