← Blogs / Data Architecture

Building Customer 360 on Databricks and Hightouch: A Complete Implementation Guide

Santosh Pradhan·April 14, 2026

By Santosh Pradhan, MarTech Solutions Architect · Munich, Germany

The Golden Record Framework establishes the principles for field-level Customer 360 resolution. This post is the implementation guide — exact steps, working code, and source-system-specific extraction logic for building it on Databricks Delta Lake with Hightouch as the activation layer. By the end you will have a running pipeline that ingests customer data from Salesforce, SAP, Shopify, Segment, and Zendesk; resolves field-level conflicts correctly; and syncs the golden record back to every destination your teams need.

Architecture Overview

The pipeline follows a five-zone medallion architecture within Databricks Unity Catalog. Raw source data lands in Bronze without transformation. Field-level change events are extracted and normalised in Silver, where the attribute history table lives and the resolution algorithm runs. The resolved golden record is materialised in Gold. Hightouch reads from Gold and syncs to activation destinations via reverse ETL. A configuration layer — the attribute name registry — sits in Silver and governs the mapping from source field names to canonical names throughout.

Source systems connect through Fivetran for managed connectors (Salesforce, SAP, Shopify, Zendesk) and through Segment webhooks or Kafka for streaming event sources. All Bronze tables have Delta Change Data Feed enabled, which makes incremental Silver processing efficient without full table scans.

Prerequisites

Before starting: a Databricks workspace with Unity Catalog enabled; a Fivetran account with connectors licensed for Salesforce, SAP, Shopify, and Zendesk; a Segment source with HTTP webhook or S3 destination configured; and a Hightouch workspace connected to Databricks as a data source. All SQL below runs on Databricks SQL or in a DLT pipeline notebook.

Step 1 — Set Up the Unity Catalog Schemas

Create four schemas under a single catalog. Using a dedicated catalog keeps Customer 360 assets isolated from other data products.

CREATE CATALOG IF NOT EXISTS c360;

CREATE SCHEMA IF NOT EXISTS c360.bronze;   -- raw source tables, append-only
CREATE SCHEMA IF NOT EXISTS c360.silver;   -- field-change history, resolution
CREATE SCHEMA IF NOT EXISTS c360.gold;     -- serving layer, activation-ready
CREATE SCHEMA IF NOT EXISTS c360.config;   -- attribute name registry, source priority

Enable Change Data Feed at the catalog level so all new Delta tables inherit it by default:

ALTER CATALOG c360
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

Step 2 — Build the Configuration Tables

The attribute name registry maps source field names to canonical ODM-aligned names. The source priority table defines which system wins when timestamps are equal. These two tables are the only place where business decisions live — everything else in the pipeline is mechanical.

-- Attribute name registry: source field → canonical name
CREATE TABLE IF NOT EXISTS c360.config.attribute_name_registry (
  source_system     STRING NOT NULL,
  source_field      STRING NOT NULL,
  canonical_name    STRING NOT NULL,  -- ODM-aligned: email, firstName, phone, etc.
  registry_version  STRING NOT NULL DEFAULT 'v1',
  is_active         BOOLEAN NOT NULL DEFAULT TRUE
);

-- Seed with mappings for each source system
INSERT INTO c360.config.attribute_name_registry VALUES
-- Salesforce CRM
('salesforce', 'Email',         'email',        'v1', true),
('salesforce', 'FirstName',     'firstName',    'v1', true),
('salesforce', 'LastName',      'lastName',     'v1', true),
('salesforce', 'Phone',         'phone',        'v1', true),
('salesforce', 'MobilePhone',   'mobilePhone',  'v1', true),
('salesforce', 'MailingCity',   'city',         'v1', true),
('salesforce', 'MailingCountry','country',      'v1', true),
-- SAP ERP
('sap',        'SMTP_ADDR',     'email',        'v1', true),
('sap',        'NAME_FIRST',    'firstName',    'v1', true),
('sap',        'NAME_LAST',     'lastName',     'v1', true),
('sap',        'TEL1_NUMBR',    'phone',        'v1', true),
('sap',        'CITY1',         'city',         'v1', true),
('sap',        'COUNTRY',       'country',      'v1', true),
-- Shopify
('shopify',    'email',         'email',        'v1', true),
('shopify',    'first_name',    'firstName',    'v1', true),
('shopify',    'last_name',     'lastName',     'v1', true),
('shopify',    'phone',         'phone',        'v1', true),
('shopify',    'accepts_marketing', 'emailConsent', 'v1', true),
-- Segment (traits are already snake_case, map to canonical)
('segment',    'email',         'email',        'v1', true),
('segment',    'first_name',    'firstName',    'v1', true),
('segment',    'last_name',     'lastName',     'v1', true),
('segment',    'phone',         'phone',        'v1', true),
-- Zendesk
('zendesk',    'email',         'email',        'v1', true),
('zendesk',    'name',          'fullName',     'v1', true),
('zendesk',    'phone',         'phone',        'v1', true);

-- Source priority: lower number = higher trust
CREATE TABLE IF NOT EXISTS c360.config.source_priority (
  source_system  STRING  NOT NULL,
  priority       INTEGER NOT NULL,  -- 1 = highest trust
  notes          STRING
);

INSERT INTO c360.config.source_priority VALUES
('salesforce', 1, 'CRM is master for contact data post-sales'),
('sap',        2, 'ERP is master for billing address and account data'),
('shopify',    3, 'Commerce is master for email consent and purchase contact'),
('zendesk',    4, 'Support portal self-service updates'),
('segment',    5, 'Behavioural tracking — lowest trust for PII fields');

Step 3 — Connect Source Systems and Build Bronze Tables

Salesforce CRM (Fivetran → Tier 2 CDC)

Configure the Fivetran Salesforce connector to sync the Contact and Account objects into c360.bronze.salesforce_contact. Fivetran's Salesforce connector uses the Bulk API and writes a _fivetran_synced timestamp and a _fivetran_deleted flag. Enable CDF on arrival:

-- Fivetran writes to this table automatically; confirm CDF is on
ALTER TABLE c360.bronze.salesforce_contact
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Key columns Fivetran provides:
-- Id, Email, FirstName, LastName, Phone, MobilePhone,
-- MailingCity, MailingCountry, LastModifiedDate,
-- _fivetran_synced, _fivetran_deleted

SAP ERP (Fivetran SAP ECC/S4HANA → Tier 2)

The Fivetran SAP connector replicates the KNA1 (general customer master) and ADR6 (email address) tables. SAP does not emit field-level timestamps — only LAEDA (last change date) at the record level — making it a classic Tier 2 source.

ALTER TABLE c360.bronze.sap_kna1
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Key columns from KNA1 + ADR6 join:
-- KUNNR (customer_id), NAME_FIRST, NAME_LAST, CITY1, COUNTRY,
-- TEL1_NUMBR, SMTP_ADDR (from ADR6), LAEDA (last change date)

Shopify / Magento (Fivetran → Tier 2, with webhook upgrade path)

The Fivetran Shopify connector syncs the customer table with updated_at at the record level. For higher-fidelity field tracking, configure Shopify webhooks for customers/update events to land in a Bronze streaming table via an AWS Lambda or Databricks Auto Loader from S3. The webhook payload carries the full customer object including updated_at, but still not per-field timestamps — treat as Tier 2.

ALTER TABLE c360.bronze.shopify_customer
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Key columns: id, email, first_name, last_name, phone,
-- accepts_marketing, accepts_marketing_updated_at,
-- updated_at, _fivetran_synced

Note that Shopify does provide accepts_marketing_updated_at as a field-level timestamp for consent — use it as source_event_time for the emailConsent attribute instead of the record-level updated_at. This is a Tier 1 attribute embedded in an otherwise Tier 2 source.

Segment Tracking (Tier 1 — field-level events natively)

Configure the Segment Databricks destination to write identify calls directly to c360.bronze.segment_identifies. Each identify() call is already a field-level event: it carries a userId, a traits JSON object containing only the fields that changed, and a precise timestamp from the source event.

-- Segment Databricks destination schema (auto-created by Segment):
-- message_id, user_id, anonymous_id, timestamp (source event time),
-- received_at (ingestion time), traits (variant/map)
ALTER TABLE c360.bronze.segment_identifies
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

Zendesk (Fivetran → Tier 2)

ALTER TABLE c360.bronze.zendesk_users
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Key columns: id, email, name, phone, updated_at,
-- _fivetran_synced, _fivetran_deleted

Step 4 — Build the Silver Attribute History Table

This is the single most important table in the pipeline. Everything upstream feeds into it; everything downstream reads from it.

CREATE TABLE IF NOT EXISTS c360.silver.customer_attribute_history (
  customer_id       STRING    NOT NULL,
  attribute_name    STRING    NOT NULL,  -- canonical name from registry
  attribute_value   STRING,
  effective_from    TIMESTAMP NOT NULL,
  effective_to      TIMESTAMP,
  is_current        BOOLEAN   NOT NULL,
  source_system     STRING    NOT NULL,
  source_event_id   STRING,
  source_priority   INTEGER   NOT NULL,
  ingestion_time    TIMESTAMP NOT NULL
)
USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
CLUSTER BY (customer_id, attribute_name);

Step 5 — Extract Field-Level Changes per Source

Each source has a dedicated extraction notebook that runs incrementally using table_changes(). The output of every notebook is a standardised staging DataFrame that feeds into the MERGE in Step 6.

Salesforce — CDF Replay

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Read only new changes since last checkpoint
sf_changes = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", get_last_version("salesforce_contact")) \
    .table("c360.bronze.salesforce_contact") \
    .filter(F.col("_change_type").isin(["insert", "update_postimage"]))

# Load registry for Salesforce
registry = spark.table("c360.config.attribute_name_registry") \
    .filter("source_system = 'salesforce' AND is_active = true") \
    .select("source_field", "canonical_name")

# Unpivot: one row per changed field per record
sf_fields = ["Email", "FirstName", "LastName", "Phone",
             "MobilePhone", "MailingCity", "MailingCountry"]

sf_unpivoted = sf_changes.select(
    F.col("Id").alias("source_customer_id"),
    F.col("LastModifiedDate").alias("source_event_time"),
    F.col("_commit_timestamp").alias("ingestion_time"),
    *[F.struct(
        F.lit(f).alias("source_field"),
        F.col(f).cast("string").alias("value")
      ).alias(f) for f in sf_fields]
).select(
    "source_customer_id", "source_event_time", "ingestion_time",
    F.explode(F.array(*[F.col(f) for f in sf_fields])).alias("field_struct")
).select(
    "source_customer_id", "source_event_time", "ingestion_time",
    F.col("field_struct.source_field").alias("source_field"),
    F.col("field_struct.value").alias("attribute_value")
).filter(F.col("attribute_value").isNotNull())

# Join to get canonical names + source priority
sf_events = sf_unpivoted \
    .join(registry, "source_field") \
    .join(spark.table("c360.config.source_priority")
              .filter("source_system = 'salesforce'"),
          F.lit(True)) \
    .select(
        resolve_customer_id("source_customer_id", "salesforce")
            .alias("customer_id"),
        F.col("canonical_name").alias("attribute_name"),
        "attribute_value",
        "source_event_time",
        "ingestion_time",
        F.lit("salesforce").alias("source_system"),
        F.concat(F.lit("sf-"), F.col("source_customer_id"),
                 F.lit("-"), F.col("canonical_name"),
                 F.lit("-"), F.col("source_event_time").cast("string"))
          .alias("source_event_id"),
        F.col("priority").alias("source_priority")
    )

SAP — CDF Replay with Record-Level Timestamp

sap_changes = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", get_last_version("sap_kna1")) \
    .table("c360.bronze.sap_kna1") \
    .filter(F.col("_change_type").isin(["insert", "update_postimage"]))

sap_fields = ["NAME_FIRST", "NAME_LAST", "CITY1", "COUNTRY",
              "TEL1_NUMBR", "SMTP_ADDR"]

# SAP has no field-level timestamp — use LAEDA (last change date)
# converted to timestamp, with ingestion_time as tiebreaker
sap_events = build_field_events(
    df=sap_changes,
    id_col="KUNNR",
    fields=sap_fields,
    event_time_col="LAEDA",       # date → cast to timestamp
    source_system="sap",
    registry=registry
)

Shopify — CDF Replay with Consent Field Override

shopify_changes = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", get_last_version("shopify_customer")) \
    .table("c360.bronze.shopify_customer") \
    .filter(F.col("_change_type").isin(["insert", "update_postimage"]))

# All fields use record-level updated_at EXCEPT emailConsent
# which has its own field-level timestamp
shopify_fields = ["email", "first_name", "last_name", "phone"]

shopify_events = build_field_events(
    df=shopify_changes,
    id_col="id",
    fields=shopify_fields,
    event_time_col="updated_at",
    source_system="shopify",
    registry=registry
)

# Consent gets its own precise timestamp (Tier 1 within Tier 2)
consent_events = shopify_changes.select(
    resolve_customer_id(F.col("id").cast("string"), "shopify")
        .alias("customer_id"),
    F.lit("emailConsent").alias("attribute_name"),
    F.col("accepts_marketing").cast("string").alias("attribute_value"),
    F.col("accepts_marketing_updated_at").alias("source_event_time"),
    F.col("_commit_timestamp").alias("ingestion_time"),
    F.lit("shopify").alias("source_system"),
    F.concat(F.lit("shopify-consent-"), F.col("id").cast("string"))
        .alias("source_event_id"),
    F.lit(3).alias("source_priority")
).filter(F.col("attribute_value").isNotNull())

shopify_all_events = shopify_events.union(consent_events)

Segment — Direct Field Events (Tier 1)

from pyspark.sql.types import MapType, StringType

# Segment identify calls — traits is already a map of changed fields
segment_new = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", get_last_version("segment_identifies")) \
    .table("c360.bronze.segment_identifies") \
    .filter(F.col("_change_type") == "insert")

# Explode the traits map — each key/value is one field-level event
segment_events = segment_new \
    .select(
        F.col("user_id").alias("source_customer_id"),
        F.col("timestamp").alias("source_event_time"),   # source event time
        F.col("received_at").alias("ingestion_time"),
        F.col("message_id").alias("source_event_id"),
        F.explode(F.col("traits").cast(MapType(StringType(), StringType())))
            .alias("source_field", "attribute_value")
    ) \
    .filter(F.col("attribute_value").isNotNull()) \
    .join(registry.filter("source_system = 'segment'"), "source_field") \
    .select(
        resolve_customer_id("source_customer_id", "segment")
            .alias("customer_id"),
        F.col("canonical_name").alias("attribute_name"),
        "attribute_value",
        "source_event_time",
        "ingestion_time",
        F.lit("segment").alias("source_system"),
        "source_event_id",
        F.lit(5).alias("source_priority")
    )

Zendesk — CDF Replay

zendesk_changes = spark.read.format("delta") \
    .option("readChangeData", "true") \
    .option("startingVersion", get_last_version("zendesk_users")) \
    .table("c360.bronze.zendesk_users") \
    .filter(F.col("_change_type").isin(["insert", "update_postimage"]))

zendesk_events = build_field_events(
    df=zendesk_changes,
    id_col="id",
    fields=["email", "name", "phone"],
    event_time_col="updated_at",
    source_system="zendesk",
    registry=registry
)

Step 6 — Merge Events into the Attribute History Table

Combine all source events and run a single MERGE into customer_attribute_history. The MERGE closes the previous current row and inserts the new one atomically.

# Combine all source events into one staging DataFrame
all_events = (
    sf_events
    .union(sap_events)
    .union(shopify_all_events)
    .union(segment_events)
    .union(zendesk_events)
)

all_events.createOrReplaceTempView("new_attribute_events")

spark.sql("""
MERGE INTO c360.silver.customer_attribute_history AS target
USING (
  -- Deduplicate: one event per customer+attribute+source combination,
  -- keeping the latest by source_event_time
  SELECT *
  FROM (
    SELECT *,
      ROW_NUMBER() OVER (
        PARTITION BY customer_id, attribute_name, source_system
        ORDER BY source_event_time DESC
      ) AS rn
    FROM new_attribute_events
    WHERE customer_id IS NOT NULL
  )
  WHERE rn = 1
) AS source
ON  target.customer_id    = source.customer_id
AND target.attribute_name = source.attribute_name
AND target.source_system  = source.source_system
AND target.is_current     = TRUE

-- Close the current row when the value has changed
WHEN MATCHED AND source.attribute_value != target.attribute_value
  AND source.source_event_time > target.effective_from THEN
  UPDATE SET
    target.is_current   = FALSE,
    target.effective_to = source.source_event_time

-- Insert the new current row for new or changed values
WHEN NOT MATCHED THEN
  INSERT (
    customer_id, attribute_name, attribute_value,
    effective_from, effective_to, is_current,
    source_system, source_event_id, source_priority, ingestion_time
  )
  VALUES (
    source.customer_id, source.attribute_name, source.attribute_value,
    source.source_event_time, NULL, TRUE,
    source.source_system, source.source_event_id,
    source.source_priority, source.ingestion_time
  )
""")

Step 7 — Build the Gold Resolution View

The resolution query runs after every MERGE cycle. It produces one row per customer per attribute — the winning value — then pivots to a flat profile table in c360.gold.

-- Resolution: pick the winning value per customer per attribute
CREATE OR REPLACE VIEW c360.silver.resolved_attributes AS
SELECT
  customer_id,
  attribute_name,
  attribute_value,
  source_system,
  effective_from  AS field_updated_at,
  source_priority
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY customer_id, attribute_name
      ORDER BY
        effective_from   DESC NULLS LAST,   -- most recent event wins
        ingestion_time   DESC,               -- tiebreaker: ingestion order
        source_priority  ASC                 -- tiebreaker: source trust rank
    ) AS rn
  FROM c360.silver.customer_attribute_history
  WHERE is_current = TRUE
    AND attribute_value IS NOT NULL
)
WHERE rn = 1;
-- Pivot resolved attributes into the flat gold profile table
CREATE OR REPLACE TABLE c360.gold.customer_profile
USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
AS
SELECT
  customer_id,
  MAX(CASE WHEN attribute_name = 'email'        THEN attribute_value END) AS email,
  MAX(CASE WHEN attribute_name = 'email'        THEN source_system   END) AS email_source,
  MAX(CASE WHEN attribute_name = 'email'        THEN field_updated_at END) AS email_updated_at,
  MAX(CASE WHEN attribute_name = 'firstName'    THEN attribute_value END) AS first_name,
  MAX(CASE WHEN attribute_name = 'firstName'    THEN source_system   END) AS first_name_source,
  MAX(CASE WHEN attribute_name = 'lastName'     THEN attribute_value END) AS last_name,
  MAX(CASE WHEN attribute_name = 'phone'        THEN attribute_value END) AS phone,
  MAX(CASE WHEN attribute_name = 'phone'        THEN source_system   END) AS phone_source,
  MAX(CASE WHEN attribute_name = 'mobilePhone'  THEN attribute_value END) AS mobile_phone,
  MAX(CASE WHEN attribute_name = 'city'         THEN attribute_value END) AS city,
  MAX(CASE WHEN attribute_name = 'country'      THEN attribute_value END) AS country,
  MAX(CASE WHEN attribute_name = 'emailConsent' THEN attribute_value END) AS email_consent,
  MAX(CASE WHEN attribute_name = 'emailConsent' THEN field_updated_at END) AS email_consent_updated_at,
  CURRENT_TIMESTAMP() AS profile_resolved_at
FROM c360.silver.resolved_attributes
GROUP BY customer_id;

For incremental refreshes after the initial load, use INSERT OVERWRITE partitioned by a hash bucket of customer_id, or rebuild the gold table via a DLT pipeline that triggers automatically after the Silver MERGE completes.

Step 8 — Orchestrate with Databricks Jobs

Wire the notebooks into a job with four sequential tasks per source, plus shared downstream tasks:

# Databricks Job definition (simplified)
name: customer_360_pipeline
schedule:
  quartz_cron_expression: "0 */2 * * * ?"   # every 2 hours
  timezone_id: "Europe/Berlin"

tasks:
  - task_key: extract_salesforce
    notebook_task:
      notebook_path: /pipelines/silver/extract_salesforce
  - task_key: extract_sap
    notebook_task:
      notebook_path: /pipelines/silver/extract_sap
  - task_key: extract_shopify
    notebook_task:
      notebook_path: /pipelines/silver/extract_shopify
  - task_key: extract_segment
    notebook_task:
      notebook_path: /pipelines/silver/extract_segment
  - task_key: extract_zendesk
    notebook_task:
      notebook_path: /pipelines/silver/extract_zendesk

  - task_key: merge_attribute_history
    depends_on:
      - task_key: extract_salesforce
      - task_key: extract_sap
      - task_key: extract_shopify
      - task_key: extract_segment
      - task_key: extract_zendesk
    notebook_task:
      notebook_path: /pipelines/silver/merge_history

  - task_key: rebuild_gold_profile
    depends_on:
      - task_key: merge_attribute_history
    notebook_task:
      notebook_path: /pipelines/gold/rebuild_customer_profile

Step 9 — Connect Hightouch to the Gold Layer

In Hightouch, add Databricks as a source. Point it to the Unity Catalog endpoint of your workspace using a service principal with SELECT on c360.gold. Then create a Model for each activation use case.

Model: Full Customer Profile — used for initial loads and full refreshes to CRM destinations:

-- Hightouch model: full_customer_profile
SELECT
  customer_id,
  email,
  email_source,
  email_updated_at,
  first_name,
  last_name,
  phone,
  mobile_phone,
  city,
  country,
  email_consent,
  email_consent_updated_at,
  profile_resolved_at
FROM c360.gold.customer_profile
WHERE email IS NOT NULL

Model: Consent-Only Updates — for syncing only consent changes to marketing destinations, keeping sync volume low:

-- Hightouch model: consent_updates
SELECT
  customer_id,
  email,
  email_consent,
  email_consent_updated_at
FROM c360.gold.customer_profile
WHERE email_consent_updated_at > DATEADD(HOUR, -3, CURRENT_TIMESTAMP())
  AND email IS NOT NULL

Step 10 — Configure Hightouch Syncs to Destinations

Sync to Salesforce CRM

Create a Hightouch sync from full_customer_profile to the Salesforce Contact object. Set the match key to email → Salesforce Email. Map fields explicitly — do not use auto-mapping, which can overwrite Salesforce-managed fields with stale values from lower-priority sources:

sync:
  name: golden_record_to_salesforce_contact
  model: full_customer_profile
  destination: salesforce
  object: Contact
  match_on:
    source: email
    destination: Email
  field_mappings:
    - source: first_name
      destination: FirstName
    - source: last_name
      destination: LastName
    - source: phone
      destination: Phone
    - source: mobile_phone
      destination: MobilePhone
    - source: city
      destination: MailingCity
    - source: country
      destination: MailingCountry
    - source: email_source
      destination: Golden_Record_Source__c     # custom field
    - source: profile_resolved_at
      destination: Golden_Record_Updated__c   # custom field
  mode: upsert
  schedule:
    type: interval
    frequency: 2h

Sync Consent to Marketing Automation (Marketo / Braze)

Use the consent_updates model. For Marketo, sync to the Lead object on email match, mapping email_consent to the unsubscribed field (inverted: email_consent = falseunsubscribed = true). For Braze, use the User object and map to email_subscribe with values opted_in / unsubscribed.

sync:
  name: consent_to_marketo
  model: consent_updates
  destination: marketo
  object: Lead
  match_on:
    source: email
    destination: email
  field_mappings:
    - source: "CASE WHEN email_consent = 'true' THEN false ELSE true END"
      destination: unsubscribed
  mode: update
  schedule:
    type: interval
    frequency: 30m   # consent syncs run more frequently

Sync to SAP for Billing Address Updates

Hightouch supports SAP via the HTTP destination using SAP OData APIs. Map city and country back to the customer master, but only when city_source or country_source indicates a higher-priority system updated those fields:

-- Hightouch model: sap_address_updates
SELECT
  customer_id,
  city,
  country,
  MAX(CASE WHEN attribute_name = 'city'    THEN source_system END) AS city_source,
  MAX(CASE WHEN attribute_name = 'country' THEN source_system END) AS country_source
FROM c360.silver.resolved_attributes
WHERE attribute_name IN ('city', 'country')
  AND source_system != 'sap'   -- only write back non-SAP winners
GROUP BY customer_id, city, country

Step 11 — Monitor Pipeline Health

Add two monitoring queries to a Databricks SQL Dashboard. The first flags sources that have gone quiet — no new events in the expected window:

-- Source freshness check
SELECT
  source_system,
  MAX(ingestion_time)                         AS last_event_at,
  DATEDIFF(HOUR, MAX(ingestion_time), NOW())  AS hours_since_last_event,
  CASE
    WHEN DATEDIFF(HOUR, MAX(ingestion_time), NOW()) > 4 THEN 'STALE'
    ELSE 'OK'
  END AS status
FROM c360.silver.customer_attribute_history
GROUP BY source_system
ORDER BY hours_since_last_event DESC;

The second surfaces resolution conflicts — attributes where two high-priority sources disagree on the current value, which often signals an upstream data quality problem worth investigating:

-- Conflict detection: multiple current values from different sources
SELECT
  customer_id,
  attribute_name,
  COUNT(DISTINCT attribute_value) AS distinct_values,
  COLLECT_LIST(source_system)     AS sources
FROM c360.silver.customer_attribute_history
WHERE is_current = TRUE
GROUP BY customer_id, attribute_name
HAVING COUNT(DISTINCT attribute_value) > 1
ORDER BY distinct_values DESC
LIMIT 100;

Step 12 — Managing Gold Layer Versions with dbt

The extraction notebooks and MERGE logic in Steps 5–6 are PySpark-first — dbt has no native primitive for Delta Change Data Feed replay. But from the Silver → Gold boundary upward, the pipeline is pure SQL and dbt becomes the right tool. It gives you version-controlled models, co-existing schema versions with deprecation windows, and built-in data quality tests that catch breaking changes before they reach a destination.

Where dbt fits

The recommended split is: Databricks Jobs own the ingestion and MERGE layer; dbt owns the resolution view and gold profile. The job in Step 8 gains a final dbt run task that replaces the notebook-based gold rebuild:

  - task_key: rebuild_gold_profile
    depends_on:
      - task_key: merge_attribute_history
    dbt_task:
      project_directory: /dbt/customer_360
      commands:
        - dbt run --select gold
        - dbt test --select gold

Project setup

pip install dbt-databricks
dbt init customer_360
# profiles.yml
customer_360:
  target: prod
  outputs:
    prod:
      type: databricks
      host: <workspace>.azuredatabricks.net
      http_path: /sql/1.0/warehouses/<warehouse_id>
      token: "{{ env_var('DBT_TOKEN') }}"
      catalog: c360
      schema: gold
# dbt_project.yml
name: customer_360
version: 1.0.0
models:
  customer_360:
    gold:
      +materialized: table
      +contract:
        enforced: true    # schema contracts — dbt will error on column drift

Versioned gold models

dbt 1.5+ supports explicit model versioning. Define all versions in a single YAML file — dbt materialises them as separate tables (customer_profile_v1, customer_profile_v2) and keeps them live simultaneously until the deprecation date passes.

# models/gold/customer_profile.yml
models:
  - name: customer_profile
    latest_version: 2
    versions:
      - v: 1
        deprecation_date: "2026-09-01"   # v1 stays live; consumers get a migration window
        defined_in: customer_profile_v1
      - v: 2
        defined_in: customer_profile_v2
    columns:
      - name: customer_id
        data_type: string
        constraints:
          - type: not_null
      - name: email
        data_type: string
-- models/gold/customer_profile_v1.sql
-- Original flat schema — maintained for backward compatibility
SELECT
  customer_id,
  email,
  first_name,
  last_name,
  phone,
  mobile_phone,
  city,
  country,
  email_consent,
  email_consent_updated_at,
  profile_resolved_at
FROM {{ ref('resolved_attributes') }}
-- pivot logic identical to Step 7
-- models/gold/customer_profile_v2.sql
-- v2 adds source attribution columns and a nested extensions map
SELECT
  customer_id,
  email,
  email_source,
  email_updated_at,
  first_name,
  first_name_source,
  last_name,
  last_name_source,
  phone,
  phone_source,
  mobile_phone,
  city,
  city_source,
  country,
  country_source,
  email_consent,
  email_consent_updated_at,
  -- MACH ODM extensions namespace
  MAP(
    'sfdc_contact_id', sfdc_contact_id,
    'sap_kunnr',       sap_kunnr
  )                           AS extensions,
  profile_resolved_at
FROM {{ ref('resolved_attributes') }}

Downstream consumers pin to a version

Any model or Hightouch SQL query that depends on the gold profile pins explicitly to a version. When v2 ships, Hightouch models and BI queries can migrate on their own schedule; v1 stays live until its deprecation date.

-- Hightouch model (v1 pin — migrates before 2026-09-01)
SELECT * FROM c360.gold.customer_profile_v1
WHERE email IS NOT NULL

-- dbt downstream model (v2 already)
SELECT * FROM {{ ref('customer_profile', version=2) }}

Schema contracts and breaking-change tests

dbt's schema contracts enforce column types at materialisation time. Add generic tests to the gold models so a removed column or type mismatch fails the job before it reaches Hightouch:

# models/gold/customer_profile.yml (continued)
    columns:
      - name: email
        data_type: string
        tests:
          - not_null
          - dbt_utils.not_empty_string
      - name: email_consent
        data_type: string
        tests:
          - accepted_values:
              values: ['true', 'false']
      - name: profile_resolved_at
        data_type: timestamp
        tests:
          - not_null
          - dbt_utils.recency:
              datepart: hour
              field: profile_resolved_at
              interval: 4    # fails if gold hasn't refreshed in 4 hours

Handling ODM version upgrades

When the MACH Open Data Model releases a new version (e.g., ODM v2 adds a loyaltyTier canonical field), the change is isolated to three places: add the new canonical name to the attribute name registry, add the column to customer_profile_v2.sql, and create customer_profile_v3.sql if the change is breaking. The history table schema — customer_attribute_history — is never altered. It stores any canonical name as a string row; ODM versioning only affects how the serving layer reads those rows.

-- Adding a new canonical field is always non-breaking:
-- 1. Insert into registry
INSERT INTO c360.config.attribute_name_registry VALUES
('salesforce', 'Loyalty_Tier__c', 'loyaltyTier', 'v2', true);

-- 2. Add to customer_profile_v2.sql
--    MAX(CASE WHEN attribute_name = 'loyaltyTier' THEN attribute_value END) AS loyalty_tier,

-- 3. No schema migration needed on customer_attribute_history

Removing or renaming a canonical field is a breaking change — it warrants a new dbt model version with a deprecation window, not an in-place edit of the existing model.

What You Have Built

At this point the pipeline is fully operational: Fivetran syncs from Salesforce, SAP, Shopify, and Zendesk land in Bronze every two hours; Segment identify events stream in continuously; the Silver extraction notebooks run in parallel and feed a single MERGE into the attribute history table; the resolution algorithm produces a correct, source-attributed, field-level golden record in Gold; and Hightouch syncs the resolved profile back to every destination on a cadence appropriate to each use case.

The architecture is deliberately modular. Adding a new source — a Klaviyo email platform, a Loyalty system, a B2B data enrichment vendor — means adding rows to the attribute name registry, writing one extraction notebook following the same pattern, and adding a task to the job. The history table schema, the resolution algorithm, and the Hightouch syncs are unchanged. New destinations follow the same pattern on the Hightouch side. The framework scales horizontally by design.

The monitoring queries above are the operational heartbeat. Run them on a schedule and alert on STALE sources before they cause a resolution gap. The conflict detection query is your data quality radar — a spike in conflicts typically precedes a visible customer experience problem by one or two pipeline cycles, giving you time to fix the root cause before it reaches a customer.

Frequently Asked Questions

How do you implement Customer 360 on Databricks?

Customer 360 on Databricks uses a medallion architecture: Bronze tables store raw source events with Delta Change Data Feed enabled; Silver holds the attribute history and runs field-level resolution via DLT APPLY CHANGES INTO; Gold materialises the resolved customer profile. dbt manages Silver-to-Gold transformation with versioned models and schema contracts.

What is Hightouch used for in a Customer 360 architecture?

Hightouch is the activation layer — it reads the resolved Gold customer profile from Databricks and syncs it back to CRM, marketing automation, ad platforms, and service tools via reverse ETL. It removes the need for custom sync code per destination and handles incremental updates, schema mapping, and error handling.

What is the difference between Tier 1 and Tier 2 sources?

Tier 1 sources provide native field-level timestamps — each attribute carries its own event time. Segment's identify() call is Tier 1. Tier 2 sources provide only record-level timestamps; field-level change events must be derived by comparing consecutive snapshots at ingestion. Salesforce, SAP, Shopify, and Zendesk are Tier 2 for most fields.

Which source systems are covered in this Customer 360 implementation?

The implementation covers five enterprise source systems: Salesforce (CRM, via Fivetran CDC), SAP ECC / S4HANA (via Fivetran, using LAEDA as event time), Shopify (e-commerce, via Fivetran and webhook), Segment (CDP identify events with native field-level timestamps), and Zendesk (support, via Fivetran). Each has source-specific extraction logic and canonical field mapping.

Santosh Pradhan

Santosh Pradhan

MarTech Solutions Architect · Munich