By Santosh Pradhan, MarTech Solutions Architect · Munich, Germany
The Golden Record Framework establishes the principles for field-level Customer 360 resolution. This post is the implementation guide — exact steps, working code, and source-system-specific extraction logic for building it on Databricks Delta Lake with Hightouch as the activation layer. By the end you will have a running pipeline that ingests customer data from Salesforce, SAP, Shopify, Segment, and Zendesk; resolves field-level conflicts correctly; and syncs the golden record back to every destination your teams need.
Architecture Overview
The pipeline follows a five-zone medallion architecture within Databricks Unity Catalog. Raw source data lands in Bronze without transformation. Field-level change events are extracted and normalised in Silver, where the attribute history table lives and the resolution algorithm runs. The resolved golden record is materialised in Gold. Hightouch reads from Gold and syncs to activation destinations via reverse ETL. A configuration layer — the attribute name registry — sits in Silver and governs the mapping from source field names to canonical names throughout.
Source systems connect through Fivetran for managed connectors (Salesforce, SAP, Shopify, Zendesk) and through Segment webhooks or Kafka for streaming event sources. All Bronze tables have Delta Change Data Feed enabled, which makes incremental Silver processing efficient without full table scans.
Prerequisites
Before starting: a Databricks workspace with Unity Catalog enabled; a Fivetran account with connectors licensed for Salesforce, SAP, Shopify, and Zendesk; a Segment source with HTTP webhook or S3 destination configured; and a Hightouch workspace connected to Databricks as a data source. All SQL below runs on Databricks SQL or in a DLT pipeline notebook.
Step 1 — Set Up the Unity Catalog Schemas
Create four schemas under a single catalog. Using a dedicated catalog keeps Customer 360 assets isolated from other data products.
CREATE CATALOG IF NOT EXISTS c360;
CREATE SCHEMA IF NOT EXISTS c360.bronze; -- raw source tables, append-only
CREATE SCHEMA IF NOT EXISTS c360.silver; -- field-change history, resolution
CREATE SCHEMA IF NOT EXISTS c360.gold; -- serving layer, activation-ready
CREATE SCHEMA IF NOT EXISTS c360.config; -- attribute name registry, source priority
Enable Change Data Feed at the catalog level so all new Delta tables inherit it by default:
ALTER CATALOG c360
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
Step 2 — Build the Configuration Tables
The attribute name registry maps source field names to canonical ODM-aligned names. The source priority table defines which system wins when timestamps are equal. These two tables are the only place where business decisions live — everything else in the pipeline is mechanical.
-- Attribute name registry: source field → canonical name
CREATE TABLE IF NOT EXISTS c360.config.attribute_name_registry (
source_system STRING NOT NULL,
source_field STRING NOT NULL,
canonical_name STRING NOT NULL, -- ODM-aligned: email, firstName, phone, etc.
registry_version STRING NOT NULL DEFAULT 'v1',
is_active BOOLEAN NOT NULL DEFAULT TRUE
);
-- Seed with mappings for each source system
INSERT INTO c360.config.attribute_name_registry VALUES
-- Salesforce CRM
('salesforce', 'Email', 'email', 'v1', true),
('salesforce', 'FirstName', 'firstName', 'v1', true),
('salesforce', 'LastName', 'lastName', 'v1', true),
('salesforce', 'Phone', 'phone', 'v1', true),
('salesforce', 'MobilePhone', 'mobilePhone', 'v1', true),
('salesforce', 'MailingCity', 'city', 'v1', true),
('salesforce', 'MailingCountry','country', 'v1', true),
-- SAP ERP
('sap', 'SMTP_ADDR', 'email', 'v1', true),
('sap', 'NAME_FIRST', 'firstName', 'v1', true),
('sap', 'NAME_LAST', 'lastName', 'v1', true),
('sap', 'TEL1_NUMBR', 'phone', 'v1', true),
('sap', 'CITY1', 'city', 'v1', true),
('sap', 'COUNTRY', 'country', 'v1', true),
-- Shopify
('shopify', 'email', 'email', 'v1', true),
('shopify', 'first_name', 'firstName', 'v1', true),
('shopify', 'last_name', 'lastName', 'v1', true),
('shopify', 'phone', 'phone', 'v1', true),
('shopify', 'accepts_marketing', 'emailConsent', 'v1', true),
-- Segment (traits are already snake_case, map to canonical)
('segment', 'email', 'email', 'v1', true),
('segment', 'first_name', 'firstName', 'v1', true),
('segment', 'last_name', 'lastName', 'v1', true),
('segment', 'phone', 'phone', 'v1', true),
-- Zendesk
('zendesk', 'email', 'email', 'v1', true),
('zendesk', 'name', 'fullName', 'v1', true),
('zendesk', 'phone', 'phone', 'v1', true);
-- Source priority: lower number = higher trust
CREATE TABLE IF NOT EXISTS c360.config.source_priority (
source_system STRING NOT NULL,
priority INTEGER NOT NULL, -- 1 = highest trust
notes STRING
);
INSERT INTO c360.config.source_priority VALUES
('salesforce', 1, 'CRM is master for contact data post-sales'),
('sap', 2, 'ERP is master for billing address and account data'),
('shopify', 3, 'Commerce is master for email consent and purchase contact'),
('zendesk', 4, 'Support portal self-service updates'),
('segment', 5, 'Behavioural tracking — lowest trust for PII fields');
Step 3 — Connect Source Systems and Build Bronze Tables
Salesforce CRM (Fivetran → Tier 2 CDC)
Configure the Fivetran Salesforce connector to sync the Contact and Account objects into c360.bronze.salesforce_contact. Fivetran's Salesforce connector uses the Bulk API and writes a _fivetran_synced timestamp and a _fivetran_deleted flag. Enable CDF on arrival:
-- Fivetran writes to this table automatically; confirm CDF is on
ALTER TABLE c360.bronze.salesforce_contact
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Key columns Fivetran provides:
-- Id, Email, FirstName, LastName, Phone, MobilePhone,
-- MailingCity, MailingCountry, LastModifiedDate,
-- _fivetran_synced, _fivetran_deleted
SAP ERP (Fivetran SAP ECC/S4HANA → Tier 2)
The Fivetran SAP connector replicates the KNA1 (general customer master) and ADR6 (email address) tables. SAP does not emit field-level timestamps — only LAEDA (last change date) at the record level — making it a classic Tier 2 source.
ALTER TABLE c360.bronze.sap_kna1
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Key columns from KNA1 + ADR6 join:
-- KUNNR (customer_id), NAME_FIRST, NAME_LAST, CITY1, COUNTRY,
-- TEL1_NUMBR, SMTP_ADDR (from ADR6), LAEDA (last change date)
Shopify / Magento (Fivetran → Tier 2, with webhook upgrade path)
The Fivetran Shopify connector syncs the customer table with updated_at at the record level. For higher-fidelity field tracking, configure Shopify webhooks for customers/update events to land in a Bronze streaming table via an AWS Lambda or Databricks Auto Loader from S3. The webhook payload carries the full customer object including updated_at, but still not per-field timestamps — treat as Tier 2.
ALTER TABLE c360.bronze.shopify_customer
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Key columns: id, email, first_name, last_name, phone,
-- accepts_marketing, accepts_marketing_updated_at,
-- updated_at, _fivetran_synced
Note that Shopify does provide accepts_marketing_updated_at as a field-level timestamp for consent — use it as source_event_time for the emailConsent attribute instead of the record-level updated_at. This is a Tier 1 attribute embedded in an otherwise Tier 2 source.
Segment Tracking (Tier 1 — field-level events natively)
Configure the Segment Databricks destination to write identify calls directly to c360.bronze.segment_identifies. Each identify() call is already a field-level event: it carries a userId, a traits JSON object containing only the fields that changed, and a precise timestamp from the source event.
-- Segment Databricks destination schema (auto-created by Segment):
-- message_id, user_id, anonymous_id, timestamp (source event time),
-- received_at (ingestion time), traits (variant/map)
ALTER TABLE c360.bronze.segment_identifies
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
Zendesk (Fivetran → Tier 2)
ALTER TABLE c360.bronze.zendesk_users
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Key columns: id, email, name, phone, updated_at,
-- _fivetran_synced, _fivetran_deleted
Step 4 — Build the Silver Attribute History Table
This is the single most important table in the pipeline. Everything upstream feeds into it; everything downstream reads from it.
CREATE TABLE IF NOT EXISTS c360.silver.customer_attribute_history (
customer_id STRING NOT NULL,
attribute_name STRING NOT NULL, -- canonical name from registry
attribute_value STRING,
effective_from TIMESTAMP NOT NULL,
effective_to TIMESTAMP,
is_current BOOLEAN NOT NULL,
source_system STRING NOT NULL,
source_event_id STRING,
source_priority INTEGER NOT NULL,
ingestion_time TIMESTAMP NOT NULL
)
USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
CLUSTER BY (customer_id, attribute_name);
Step 5 — Extract Field-Level Changes per Source
Each source has a dedicated extraction notebook that runs incrementally using table_changes(). The output of every notebook is a standardised staging DataFrame that feeds into the MERGE in Step 6.
Salesforce — CDF Replay
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read only new changes since last checkpoint
sf_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", get_last_version("salesforce_contact")) \
.table("c360.bronze.salesforce_contact") \
.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
# Load registry for Salesforce
registry = spark.table("c360.config.attribute_name_registry") \
.filter("source_system = 'salesforce' AND is_active = true") \
.select("source_field", "canonical_name")
# Unpivot: one row per changed field per record
sf_fields = ["Email", "FirstName", "LastName", "Phone",
"MobilePhone", "MailingCity", "MailingCountry"]
sf_unpivoted = sf_changes.select(
F.col("Id").alias("source_customer_id"),
F.col("LastModifiedDate").alias("source_event_time"),
F.col("_commit_timestamp").alias("ingestion_time"),
*[F.struct(
F.lit(f).alias("source_field"),
F.col(f).cast("string").alias("value")
).alias(f) for f in sf_fields]
).select(
"source_customer_id", "source_event_time", "ingestion_time",
F.explode(F.array(*[F.col(f) for f in sf_fields])).alias("field_struct")
).select(
"source_customer_id", "source_event_time", "ingestion_time",
F.col("field_struct.source_field").alias("source_field"),
F.col("field_struct.value").alias("attribute_value")
).filter(F.col("attribute_value").isNotNull())
# Join to get canonical names + source priority
sf_events = sf_unpivoted \
.join(registry, "source_field") \
.join(spark.table("c360.config.source_priority")
.filter("source_system = 'salesforce'"),
F.lit(True)) \
.select(
resolve_customer_id("source_customer_id", "salesforce")
.alias("customer_id"),
F.col("canonical_name").alias("attribute_name"),
"attribute_value",
"source_event_time",
"ingestion_time",
F.lit("salesforce").alias("source_system"),
F.concat(F.lit("sf-"), F.col("source_customer_id"),
F.lit("-"), F.col("canonical_name"),
F.lit("-"), F.col("source_event_time").cast("string"))
.alias("source_event_id"),
F.col("priority").alias("source_priority")
)
SAP — CDF Replay with Record-Level Timestamp
sap_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", get_last_version("sap_kna1")) \
.table("c360.bronze.sap_kna1") \
.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
sap_fields = ["NAME_FIRST", "NAME_LAST", "CITY1", "COUNTRY",
"TEL1_NUMBR", "SMTP_ADDR"]
# SAP has no field-level timestamp — use LAEDA (last change date)
# converted to timestamp, with ingestion_time as tiebreaker
sap_events = build_field_events(
df=sap_changes,
id_col="KUNNR",
fields=sap_fields,
event_time_col="LAEDA", # date → cast to timestamp
source_system="sap",
registry=registry
)
Shopify — CDF Replay with Consent Field Override
shopify_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", get_last_version("shopify_customer")) \
.table("c360.bronze.shopify_customer") \
.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
# All fields use record-level updated_at EXCEPT emailConsent
# which has its own field-level timestamp
shopify_fields = ["email", "first_name", "last_name", "phone"]
shopify_events = build_field_events(
df=shopify_changes,
id_col="id",
fields=shopify_fields,
event_time_col="updated_at",
source_system="shopify",
registry=registry
)
# Consent gets its own precise timestamp (Tier 1 within Tier 2)
consent_events = shopify_changes.select(
resolve_customer_id(F.col("id").cast("string"), "shopify")
.alias("customer_id"),
F.lit("emailConsent").alias("attribute_name"),
F.col("accepts_marketing").cast("string").alias("attribute_value"),
F.col("accepts_marketing_updated_at").alias("source_event_time"),
F.col("_commit_timestamp").alias("ingestion_time"),
F.lit("shopify").alias("source_system"),
F.concat(F.lit("shopify-consent-"), F.col("id").cast("string"))
.alias("source_event_id"),
F.lit(3).alias("source_priority")
).filter(F.col("attribute_value").isNotNull())
shopify_all_events = shopify_events.union(consent_events)
Segment — Direct Field Events (Tier 1)
from pyspark.sql.types import MapType, StringType
# Segment identify calls — traits is already a map of changed fields
segment_new = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", get_last_version("segment_identifies")) \
.table("c360.bronze.segment_identifies") \
.filter(F.col("_change_type") == "insert")
# Explode the traits map — each key/value is one field-level event
segment_events = segment_new \
.select(
F.col("user_id").alias("source_customer_id"),
F.col("timestamp").alias("source_event_time"), # source event time
F.col("received_at").alias("ingestion_time"),
F.col("message_id").alias("source_event_id"),
F.explode(F.col("traits").cast(MapType(StringType(), StringType())))
.alias("source_field", "attribute_value")
) \
.filter(F.col("attribute_value").isNotNull()) \
.join(registry.filter("source_system = 'segment'"), "source_field") \
.select(
resolve_customer_id("source_customer_id", "segment")
.alias("customer_id"),
F.col("canonical_name").alias("attribute_name"),
"attribute_value",
"source_event_time",
"ingestion_time",
F.lit("segment").alias("source_system"),
"source_event_id",
F.lit(5).alias("source_priority")
)
Zendesk — CDF Replay
zendesk_changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", get_last_version("zendesk_users")) \
.table("c360.bronze.zendesk_users") \
.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
zendesk_events = build_field_events(
df=zendesk_changes,
id_col="id",
fields=["email", "name", "phone"],
event_time_col="updated_at",
source_system="zendesk",
registry=registry
)
Step 6 — Merge Events into the Attribute History Table
Combine all source events and run a single MERGE into customer_attribute_history. The MERGE closes the previous current row and inserts the new one atomically.
# Combine all source events into one staging DataFrame
all_events = (
sf_events
.union(sap_events)
.union(shopify_all_events)
.union(segment_events)
.union(zendesk_events)
)
all_events.createOrReplaceTempView("new_attribute_events")
spark.sql("""
MERGE INTO c360.silver.customer_attribute_history AS target
USING (
-- Deduplicate: one event per customer+attribute+source combination,
-- keeping the latest by source_event_time
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY customer_id, attribute_name, source_system
ORDER BY source_event_time DESC
) AS rn
FROM new_attribute_events
WHERE customer_id IS NOT NULL
)
WHERE rn = 1
) AS source
ON target.customer_id = source.customer_id
AND target.attribute_name = source.attribute_name
AND target.source_system = source.source_system
AND target.is_current = TRUE
-- Close the current row when the value has changed
WHEN MATCHED AND source.attribute_value != target.attribute_value
AND source.source_event_time > target.effective_from THEN
UPDATE SET
target.is_current = FALSE,
target.effective_to = source.source_event_time
-- Insert the new current row for new or changed values
WHEN NOT MATCHED THEN
INSERT (
customer_id, attribute_name, attribute_value,
effective_from, effective_to, is_current,
source_system, source_event_id, source_priority, ingestion_time
)
VALUES (
source.customer_id, source.attribute_name, source.attribute_value,
source.source_event_time, NULL, TRUE,
source.source_system, source.source_event_id,
source.source_priority, source.ingestion_time
)
""")
Step 7 — Build the Gold Resolution View
The resolution query runs after every MERGE cycle. It produces one row per customer per attribute — the winning value — then pivots to a flat profile table in c360.gold.
-- Resolution: pick the winning value per customer per attribute
CREATE OR REPLACE VIEW c360.silver.resolved_attributes AS
SELECT
customer_id,
attribute_name,
attribute_value,
source_system,
effective_from AS field_updated_at,
source_priority
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY customer_id, attribute_name
ORDER BY
effective_from DESC NULLS LAST, -- most recent event wins
ingestion_time DESC, -- tiebreaker: ingestion order
source_priority ASC -- tiebreaker: source trust rank
) AS rn
FROM c360.silver.customer_attribute_history
WHERE is_current = TRUE
AND attribute_value IS NOT NULL
)
WHERE rn = 1;
-- Pivot resolved attributes into the flat gold profile table
CREATE OR REPLACE TABLE c360.gold.customer_profile
USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
AS
SELECT
customer_id,
MAX(CASE WHEN attribute_name = 'email' THEN attribute_value END) AS email,
MAX(CASE WHEN attribute_name = 'email' THEN source_system END) AS email_source,
MAX(CASE WHEN attribute_name = 'email' THEN field_updated_at END) AS email_updated_at,
MAX(CASE WHEN attribute_name = 'firstName' THEN attribute_value END) AS first_name,
MAX(CASE WHEN attribute_name = 'firstName' THEN source_system END) AS first_name_source,
MAX(CASE WHEN attribute_name = 'lastName' THEN attribute_value END) AS last_name,
MAX(CASE WHEN attribute_name = 'phone' THEN attribute_value END) AS phone,
MAX(CASE WHEN attribute_name = 'phone' THEN source_system END) AS phone_source,
MAX(CASE WHEN attribute_name = 'mobilePhone' THEN attribute_value END) AS mobile_phone,
MAX(CASE WHEN attribute_name = 'city' THEN attribute_value END) AS city,
MAX(CASE WHEN attribute_name = 'country' THEN attribute_value END) AS country,
MAX(CASE WHEN attribute_name = 'emailConsent' THEN attribute_value END) AS email_consent,
MAX(CASE WHEN attribute_name = 'emailConsent' THEN field_updated_at END) AS email_consent_updated_at,
CURRENT_TIMESTAMP() AS profile_resolved_at
FROM c360.silver.resolved_attributes
GROUP BY customer_id;
For incremental refreshes after the initial load, use INSERT OVERWRITE partitioned by a hash bucket of customer_id, or rebuild the gold table via a DLT pipeline that triggers automatically after the Silver MERGE completes.
Step 8 — Orchestrate with Databricks Jobs
Wire the notebooks into a job with four sequential tasks per source, plus shared downstream tasks:
# Databricks Job definition (simplified)
name: customer_360_pipeline
schedule:
quartz_cron_expression: "0 */2 * * * ?" # every 2 hours
timezone_id: "Europe/Berlin"
tasks:
- task_key: extract_salesforce
notebook_task:
notebook_path: /pipelines/silver/extract_salesforce
- task_key: extract_sap
notebook_task:
notebook_path: /pipelines/silver/extract_sap
- task_key: extract_shopify
notebook_task:
notebook_path: /pipelines/silver/extract_shopify
- task_key: extract_segment
notebook_task:
notebook_path: /pipelines/silver/extract_segment
- task_key: extract_zendesk
notebook_task:
notebook_path: /pipelines/silver/extract_zendesk
- task_key: merge_attribute_history
depends_on:
- task_key: extract_salesforce
- task_key: extract_sap
- task_key: extract_shopify
- task_key: extract_segment
- task_key: extract_zendesk
notebook_task:
notebook_path: /pipelines/silver/merge_history
- task_key: rebuild_gold_profile
depends_on:
- task_key: merge_attribute_history
notebook_task:
notebook_path: /pipelines/gold/rebuild_customer_profile
Step 9 — Connect Hightouch to the Gold Layer
In Hightouch, add Databricks as a source. Point it to the Unity Catalog endpoint of your workspace using a service principal with SELECT on c360.gold. Then create a Model for each activation use case.
Model: Full Customer Profile — used for initial loads and full refreshes to CRM destinations:
-- Hightouch model: full_customer_profile
SELECT
customer_id,
email,
email_source,
email_updated_at,
first_name,
last_name,
phone,
mobile_phone,
city,
country,
email_consent,
email_consent_updated_at,
profile_resolved_at
FROM c360.gold.customer_profile
WHERE email IS NOT NULL
Model: Consent-Only Updates — for syncing only consent changes to marketing destinations, keeping sync volume low:
-- Hightouch model: consent_updates
SELECT
customer_id,
email,
email_consent,
email_consent_updated_at
FROM c360.gold.customer_profile
WHERE email_consent_updated_at > DATEADD(HOUR, -3, CURRENT_TIMESTAMP())
AND email IS NOT NULL
Step 10 — Configure Hightouch Syncs to Destinations
Sync to Salesforce CRM
Create a Hightouch sync from full_customer_profile to the Salesforce Contact object. Set the match key to email → Salesforce Email. Map fields explicitly — do not use auto-mapping, which can overwrite Salesforce-managed fields with stale values from lower-priority sources:
sync:
name: golden_record_to_salesforce_contact
model: full_customer_profile
destination: salesforce
object: Contact
match_on:
source: email
destination: Email
field_mappings:
- source: first_name
destination: FirstName
- source: last_name
destination: LastName
- source: phone
destination: Phone
- source: mobile_phone
destination: MobilePhone
- source: city
destination: MailingCity
- source: country
destination: MailingCountry
- source: email_source
destination: Golden_Record_Source__c # custom field
- source: profile_resolved_at
destination: Golden_Record_Updated__c # custom field
mode: upsert
schedule:
type: interval
frequency: 2h
Sync Consent to Marketing Automation (Marketo / Braze)
Use the consent_updates model. For Marketo, sync to the Lead object on email match, mapping email_consent to the unsubscribed field (inverted: email_consent = false → unsubscribed = true). For Braze, use the User object and map to email_subscribe with values opted_in / unsubscribed.
sync:
name: consent_to_marketo
model: consent_updates
destination: marketo
object: Lead
match_on:
source: email
destination: email
field_mappings:
- source: "CASE WHEN email_consent = 'true' THEN false ELSE true END"
destination: unsubscribed
mode: update
schedule:
type: interval
frequency: 30m # consent syncs run more frequently
Sync to SAP for Billing Address Updates
Hightouch supports SAP via the HTTP destination using SAP OData APIs. Map city and country back to the customer master, but only when city_source or country_source indicates a higher-priority system updated those fields:
-- Hightouch model: sap_address_updates
SELECT
customer_id,
city,
country,
MAX(CASE WHEN attribute_name = 'city' THEN source_system END) AS city_source,
MAX(CASE WHEN attribute_name = 'country' THEN source_system END) AS country_source
FROM c360.silver.resolved_attributes
WHERE attribute_name IN ('city', 'country')
AND source_system != 'sap' -- only write back non-SAP winners
GROUP BY customer_id, city, country
Step 11 — Monitor Pipeline Health
Add two monitoring queries to a Databricks SQL Dashboard. The first flags sources that have gone quiet — no new events in the expected window:
-- Source freshness check
SELECT
source_system,
MAX(ingestion_time) AS last_event_at,
DATEDIFF(HOUR, MAX(ingestion_time), NOW()) AS hours_since_last_event,
CASE
WHEN DATEDIFF(HOUR, MAX(ingestion_time), NOW()) > 4 THEN 'STALE'
ELSE 'OK'
END AS status
FROM c360.silver.customer_attribute_history
GROUP BY source_system
ORDER BY hours_since_last_event DESC;
The second surfaces resolution conflicts — attributes where two high-priority sources disagree on the current value, which often signals an upstream data quality problem worth investigating:
-- Conflict detection: multiple current values from different sources
SELECT
customer_id,
attribute_name,
COUNT(DISTINCT attribute_value) AS distinct_values,
COLLECT_LIST(source_system) AS sources
FROM c360.silver.customer_attribute_history
WHERE is_current = TRUE
GROUP BY customer_id, attribute_name
HAVING COUNT(DISTINCT attribute_value) > 1
ORDER BY distinct_values DESC
LIMIT 100;
Step 12 — Managing Gold Layer Versions with dbt
The extraction notebooks and MERGE logic in Steps 5–6 are PySpark-first — dbt has no native primitive for Delta Change Data Feed replay. But from the Silver → Gold boundary upward, the pipeline is pure SQL and dbt becomes the right tool. It gives you version-controlled models, co-existing schema versions with deprecation windows, and built-in data quality tests that catch breaking changes before they reach a destination.
Where dbt fits
The recommended split is: Databricks Jobs own the ingestion and MERGE layer; dbt owns the resolution view and gold profile. The job in Step 8 gains a final dbt run task that replaces the notebook-based gold rebuild:
- task_key: rebuild_gold_profile
depends_on:
- task_key: merge_attribute_history
dbt_task:
project_directory: /dbt/customer_360
commands:
- dbt run --select gold
- dbt test --select gold
Project setup
pip install dbt-databricks
dbt init customer_360
# profiles.yml
customer_360:
target: prod
outputs:
prod:
type: databricks
host: <workspace>.azuredatabricks.net
http_path: /sql/1.0/warehouses/<warehouse_id>
token: "{{ env_var('DBT_TOKEN') }}"
catalog: c360
schema: gold
# dbt_project.yml
name: customer_360
version: 1.0.0
models:
customer_360:
gold:
+materialized: table
+contract:
enforced: true # schema contracts — dbt will error on column drift
Versioned gold models
dbt 1.5+ supports explicit model versioning. Define all versions in a single YAML file — dbt materialises them as separate tables (customer_profile_v1, customer_profile_v2) and keeps them live simultaneously until the deprecation date passes.
# models/gold/customer_profile.yml
models:
- name: customer_profile
latest_version: 2
versions:
- v: 1
deprecation_date: "2026-09-01" # v1 stays live; consumers get a migration window
defined_in: customer_profile_v1
- v: 2
defined_in: customer_profile_v2
columns:
- name: customer_id
data_type: string
constraints:
- type: not_null
- name: email
data_type: string
-- models/gold/customer_profile_v1.sql
-- Original flat schema — maintained for backward compatibility
SELECT
customer_id,
email,
first_name,
last_name,
phone,
mobile_phone,
city,
country,
email_consent,
email_consent_updated_at,
profile_resolved_at
FROM {{ ref('resolved_attributes') }}
-- pivot logic identical to Step 7
-- models/gold/customer_profile_v2.sql
-- v2 adds source attribution columns and a nested extensions map
SELECT
customer_id,
email,
email_source,
email_updated_at,
first_name,
first_name_source,
last_name,
last_name_source,
phone,
phone_source,
mobile_phone,
city,
city_source,
country,
country_source,
email_consent,
email_consent_updated_at,
-- MACH ODM extensions namespace
MAP(
'sfdc_contact_id', sfdc_contact_id,
'sap_kunnr', sap_kunnr
) AS extensions,
profile_resolved_at
FROM {{ ref('resolved_attributes') }}
Downstream consumers pin to a version
Any model or Hightouch SQL query that depends on the gold profile pins explicitly to a version. When v2 ships, Hightouch models and BI queries can migrate on their own schedule; v1 stays live until its deprecation date.
-- Hightouch model (v1 pin — migrates before 2026-09-01)
SELECT * FROM c360.gold.customer_profile_v1
WHERE email IS NOT NULL
-- dbt downstream model (v2 already)
SELECT * FROM {{ ref('customer_profile', version=2) }}
Schema contracts and breaking-change tests
dbt's schema contracts enforce column types at materialisation time. Add generic tests to the gold models so a removed column or type mismatch fails the job before it reaches Hightouch:
# models/gold/customer_profile.yml (continued)
columns:
- name: email
data_type: string
tests:
- not_null
- dbt_utils.not_empty_string
- name: email_consent
data_type: string
tests:
- accepted_values:
values: ['true', 'false']
- name: profile_resolved_at
data_type: timestamp
tests:
- not_null
- dbt_utils.recency:
datepart: hour
field: profile_resolved_at
interval: 4 # fails if gold hasn't refreshed in 4 hours
Handling ODM version upgrades
When the MACH Open Data Model releases a new version (e.g., ODM v2 adds a loyaltyTier canonical field), the change is isolated to three places: add the new canonical name to the attribute name registry, add the column to customer_profile_v2.sql, and create customer_profile_v3.sql if the change is breaking. The history table schema — customer_attribute_history — is never altered. It stores any canonical name as a string row; ODM versioning only affects how the serving layer reads those rows.
-- Adding a new canonical field is always non-breaking:
-- 1. Insert into registry
INSERT INTO c360.config.attribute_name_registry VALUES
('salesforce', 'Loyalty_Tier__c', 'loyaltyTier', 'v2', true);
-- 2. Add to customer_profile_v2.sql
-- MAX(CASE WHEN attribute_name = 'loyaltyTier' THEN attribute_value END) AS loyalty_tier,
-- 3. No schema migration needed on customer_attribute_history
Removing or renaming a canonical field is a breaking change — it warrants a new dbt model version with a deprecation window, not an in-place edit of the existing model.
What You Have Built
At this point the pipeline is fully operational: Fivetran syncs from Salesforce, SAP, Shopify, and Zendesk land in Bronze every two hours; Segment identify events stream in continuously; the Silver extraction notebooks run in parallel and feed a single MERGE into the attribute history table; the resolution algorithm produces a correct, source-attributed, field-level golden record in Gold; and Hightouch syncs the resolved profile back to every destination on a cadence appropriate to each use case.
The architecture is deliberately modular. Adding a new source — a Klaviyo email platform, a Loyalty system, a B2B data enrichment vendor — means adding rows to the attribute name registry, writing one extraction notebook following the same pattern, and adding a task to the job. The history table schema, the resolution algorithm, and the Hightouch syncs are unchanged. New destinations follow the same pattern on the Hightouch side. The framework scales horizontally by design.
The monitoring queries above are the operational heartbeat. Run them on a schedule and alert on STALE sources before they cause a resolution gap. The conflict detection query is your data quality radar — a spike in conflicts typically precedes a visible customer experience problem by one or two pipeline cycles, giving you time to fix the root cause before it reaches a customer.
Frequently Asked Questions
How do you implement Customer 360 on Databricks?
Customer 360 on Databricks uses a medallion architecture: Bronze tables store raw source events with Delta Change Data Feed enabled; Silver holds the attribute history and runs field-level resolution via DLT APPLY CHANGES INTO; Gold materialises the resolved customer profile. dbt manages Silver-to-Gold transformation with versioned models and schema contracts.
What is Hightouch used for in a Customer 360 architecture?
Hightouch is the activation layer — it reads the resolved Gold customer profile from Databricks and syncs it back to CRM, marketing automation, ad platforms, and service tools via reverse ETL. It removes the need for custom sync code per destination and handles incremental updates, schema mapping, and error handling.
What is the difference between Tier 1 and Tier 2 sources?
Tier 1 sources provide native field-level timestamps — each attribute carries its own event time. Segment's identify() call is Tier 1. Tier 2 sources provide only record-level timestamps; field-level change events must be derived by comparing consecutive snapshots at ingestion. Salesforce, SAP, Shopify, and Zendesk are Tier 2 for most fields.
Which source systems are covered in this Customer 360 implementation?
The implementation covers five enterprise source systems: Salesforce (CRM, via Fivetran CDC), SAP ECC / S4HANA (via Fivetran, using LAEDA as event time), Shopify (e-commerce, via Fivetran and webhook), Segment (CDP identify events with native field-level timestamps), and Zendesk (support, via Fivetran). Each has source-specific extraction logic and canonical field mapping.