SCD, MERGE, incremental loads, dedup, pivot, JSON. The SQL that powers dbt models and Airflow tasks.
Just update the record. Old value is gone. Use when history doesn't matter — correcting typos, wrong phone numbers.
-- Oracle MERGE — SCD Type 1
MERGE INTO accounts tgt
USING accounts_staging src ON (tgt.account_id = src.account_id)
WHEN MATCHED THEN
UPDATE SET tgt.city = src.city,
tgt.segment = src.segment,
tgt.updated_at = SYSDATE
WHERE tgt.city <> src.city OR tgt.segment <> src.segment
WHEN NOT MATCHED THEN
INSERT (account_id, customer, city, segment, created_at)
VALUES (src.account_id, src.customer, src.city, src.segment, SYSDATE);
New row for every change. Old row gets an end date. Every serious warehouse uses this.
-- Table structure: one row per version of each account
account_id | city | segment | effective_from | effective_to | is_current
ACC003 | Delhi | RETAIL | 2020-01-01 | 2022-05-31 | 0
ACC003 | Pune | RETAIL | 2022-06-01 | 2024-02-28 | 0
ACC003 | Mumbai | HNI | 2024-03-01 | 9999-12-31 | 1 ← current
-- Step 1: expire old rows for changed accounts
UPDATE accounts_dim tgt
SET effective_to = TRUNC(SYSDATE) - 1, is_current = 0
WHERE is_current = 1
AND EXISTS (
SELECT 1 FROM accounts_staging src
WHERE src.account_id = tgt.account_id
AND (src.city <> tgt.city OR src.segment <> tgt.segment)
);
-- Step 2: insert new current rows
INSERT INTO accounts_dim (surrogate_key, account_id, city, segment,
effective_from, effective_to, is_current)
SELECT accounts_dim_seq.NEXTVAL,
src.account_id, src.city, src.segment,
TRUNC(SYSDATE), DATE '9999-12-31', 1
FROM accounts_staging src
WHERE NOT EXISTS (
SELECT 1 FROM accounts_dim tgt
WHERE tgt.account_id = src.account_id
AND tgt.is_current = 1
AND tgt.city = src.city AND tgt.segment = src.segment
);
-- Point-in-time query: what did account ACC003 look like on 2023-01-01?
SELECT account_id, city, segment
FROM accounts_dim
WHERE effective_from <= DATE '2023-01-01'
AND effective_to > DATE '2023-01-01';
-- PostgreSQL: INSERT ... ON CONFLICT
INSERT INTO transactions_target
(txn_id, account_id, amount, status)
SELECT txn_id, account_id, amount, status
FROM transactions_staging
ON CONFLICT (txn_id)
DO UPDATE SET
status = EXCLUDED.status,
amount = EXCLUDED.amount,
updated_at = CURRENT_TIMESTAMP
WHERE transactions_target.status <> EXCLUDED.status;
-- Skip duplicates silently
ON CONFLICT (txn_id) DO NOTHING;
-- MySQL
INSERT INTO target (txn_id, amount, status)
SELECT txn_id, amount, status FROM staging
ON DUPLICATE KEY UPDATE
status = VALUES(status), amount = VALUES(amount);
-- Control table stores last loaded timestamp
pipeline_name | last_loaded_at | rows_loaded | status
TXN_DAILY | 2024-03-14 23:59:00 | 48291 | SUCCESS
-- Load only rows newer than the watermark
INSERT INTO transactions_warehouse
SELECT txn_id, account_id, txn_date, amount, status
FROM transactions_source
WHERE created_at > (SELECT last_loaded_at FROM pipeline_control
WHERE pipeline_name = 'TXN_DAILY')
AND created_at <= SYSDATE; -- safety ceiling
-- Update watermark after successful load
UPDATE pipeline_control
SET last_loaded_at = SYSDATE, rows_loaded = SQL%ROWCOUNT, status = 'SUCCESS'
WHERE pipeline_name = 'TXN_DAILY';
-- Keep only the most recent row per txn_id (memorise this pattern)
WITH deduped AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY txn_id -- what defines a "duplicate"
ORDER BY created_at DESC -- which one to keep
) AS rn
FROM transactions_staging
)
INSERT INTO transactions_clean
SELECT txn_id, account_id, amount, status, branch, txn_date
FROM deduped
WHERE rn = 1;
-- CASE WHEN pivot — works in all databases
SELECT branch,
SUM(CASE WHEN TO_CHAR(txn_date,'MON')='JAN' THEN amount END) AS jan,
SUM(CASE WHEN TO_CHAR(txn_date,'MON')='FEB' THEN amount END) AS feb,
SUM(CASE WHEN TO_CHAR(txn_date,'MON')='MAR' THEN amount END) AS mar
FROM transactions
WHERE status = 'SUCCESS' AND EXTRACT(YEAR FROM txn_date) = 2024
GROUP BY branch;
-- Oracle native PIVOT
SELECT * FROM (
SELECT branch, TO_CHAR(txn_date,'MON') AS mth, amount
FROM transactions WHERE status='SUCCESS'
) PIVOT (
SUM(amount) FOR mth IN ('JAN' AS jan, 'FEB' AS feb, 'MAR' AS mar)
);
-- Oracle: extract scalar fields
-- payload: {"customer":{"tier":"HNI"},"device":"mobile","risk_score":72}
SELECT txn_id,
JSON_VALUE(payload, '$.customer.tier') AS tier,
JSON_VALUE(payload, '$.device') AS device,
JSON_VALUE(payload, '$.risk_score' RETURNING NUMBER) AS risk_score
FROM transactions_raw
WHERE JSON_VALUE(payload, '$.risk_score' RETURNING NUMBER) > 70;
-- PostgreSQL JSONB operators
SELECT txn_id,
payload ->> 'device' AS device, -- ->> returns TEXT
payload -> 'customer' ->> 'tier' AS tier, -- -> returns JSON
(payload ->> 'risk_score')::numeric AS risk_score
FROM transactions_raw
WHERE (payload ->> 'risk_score')::numeric > 70;
-- Explode JSON array into rows (PostgreSQL)
SELECT t.txn_id,
prod ->> 'code' AS product_code,
prod ->> 'amount' AS product_amount
FROM transactions_raw t,
jsonb_array_elements(t.payload -> 'products') AS prod;