Skip to main content

Building an Enterprise Compliance Pipeline with Coalesce

    Overview

    This hands-on lab teaches you how to build an enterprise-grade regulatory compliance pipeline using Coalesce and Snowflake. You'll create a system that tracks customer history, efficiently processes large transaction volumes using incremental patterns, and generates audit reports for SOX and AML compliance.

    What You'll Need

    • A Coalesce account (either a trial account created using Snowflake Partner Connect or access to an existing account).
    • Basic knowledge of SQL and data warehousing concepts.
    • A Snowflake account with access to SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL database.
      • This is a large database. It's recommended to use a small sampling of the data. Specifically the STORE_SALES table. Try using:

        CREATE TABLE YOUR_TABLE.TPCDS_SF10TCL.STORE_SALES_SAMPLE AS
        SELECT * FROM TPCDS_10TB.TPCDS_SF10TCL.STORE_SALES
        SAMPLE SYSTEM (0.23); -- Adjust percentage to get ~65M rows
      • A Snowflake warehouse (SMALL or larger recommended)

    What You'll Build

    • A complete compliance data pipeline with incremental processing
    • Customer dimension with full audit history (SCD Type 2)
    • Automated SOX and AML compliance reporting
    • High-performance transaction processing system
    This image shows a Coalesce data pipeline for a regulatory compliance project. It features various Nodes such as source tables, incremental loads, persistent stages, facts, and views. The pipeline is fully built and ready to execute.

    What You'll Learn

    • How to implement incremental data loading patterns
    • How to create slowly changing dimensions for audit trails
    • How to build compliance risk assessments
    • How to generate regulatory reports
    • How to simulate and test incremental loads

    Understanding the Source Data

    The TPC-DS dataset simulates a retail company's data warehouse, providing realistic volumes and relationships for testing enterprise-scale data pipelines. For our compliance pipeline, we'll focus on two critical tables that form the foundation of customer transaction tracking.

    CUSTOMER Table

    The CUSTOMER table contains master data for all customers in the retail system. This table serves as the source for our slowly changing dimension, allowing us to track customer information changes over time for compliance purposes.

    Key Columns:

    Column NameDescriptionUsage in Pipeline
    C_CUSTOMER_SKSurrogate key uniquely identifying each customerBusiness key for SCD Type 2 tracking
    C_FIRST_NAMECustomer's first nameTrack name changes for compliance
    C_LAST_NAMECustomer's last nameTrack name changes for compliance
    C_EMAIL_ADDRESSCustomer's email addressRisk indicator when missing
    C_PREFERRED_CUST_FLAGFlag indicating preferred customer status (Y/N)Used in risk assessment calculations
    Data Volume

    The CUSTOMER table contains approximately 65 million customer records in the SF100TCL dataset, providing a realistic volume for testing enterprise compliance systems.

    STORE_SALES Table

    The STORE_SALES table captures every retail transaction, making it ideal for demonstrating high-volume incremental processing patterns. Each row represents a line item from a sales transaction.

    Key Columns:

    Column NameDescriptionUsage in Pipeline
    SS_SOLD_DATE_SKDate key indicating when the sale occurredIncremental load column (high water mark)
    SS_CUSTOMER_SKForeign key linking to the customerJoins to customer dimension
    SS_ITEM_SKProduct identifierPart of composite business key
    SS_TICKET_NUMBERUnique transaction identifierEnsures transaction uniqueness
    SS_NET_PAIDNet amount paid after discountsAML threshold monitoring ($10,000+)
    Large Dataset Warning

    The STORE_SALES table contains over 2.8 billion rows in the SF100TCL dataset. This massive volume is why we implement incremental loading patterns rather than processing the entire table repeatedly.

    Data Relationships

    The tables are connected through a foreign key relationship:

    • STORE_SALES.SS_CUSTOMER_SKCUSTOMER.C_CUSTOMER_SK

    This relationship allows us to:

    • Enrich transactions with current customer information
    • Track customer behavior over time
    • Identify high-risk transactions based on customer profiles
    • Generate comprehensive compliance reports

    Compliance Relevance

    These tables simulate common compliance scenarios:

    • Customer Due Diligence: The CUSTOMER table provides identity information required for Know Your Customer (KYC) processes
    • Transaction Monitoring: STORE_SALES enables detection of suspicious patterns and high-value transactions
    • Audit Trail: The combination supports complete transaction lineage from customer to purchase
    Processing Large Datasets

    This tutorial uses the TPC-DS 100 TB dataset which contains over 2.8 billion rows in the STORE_SALES table. We'll use a simulation pattern to demonstrate incremental loading without processing the entire dataset.

    Setting Up Your Environment

    Let's configure your Coalesce workspace with the necessary packages and storage locations for our compliance pipeline.

    Install Required Packages

    1. Navigate to Build Settings > Packages.

    2. Install the following packages:

      This shows a data management interface with a left panel displaying different Node types (Dimension, Fact, Incremental Load, etc.) and their associated tables. The right panel shows installed packages: baseNodeTypes v1.3.4 and incrementalLoading v1.1.3, both published by coalesce and currently in use.
      Build Settings - Package Install
    3. Give the packages a name. It's recommended to give them a name related to either the package name or use in your DAG.

    Configure Storage Locations

    1. Navigate to Build Settings > Storage Locations.

    2. Create three Storage Locations to organize your compliance data:

      Storage LocationPurpose
      REGULATORY_RAWSource data ingestion
      REGULATORY_STAGEData transformation and processing
      REGULATORY_MARTFinal compliance reports and analytics
    3. Go to Workspace and select the Workspace you're using.

    4. Click Storage Mappings.

    5. Update your Storage Mappings to match your database and schema.

    This shows workspace settings for Regulatory Compliance with Financial Services Audit Trail. The Storage Location Mapping section displays three database mappings: REGULATORY_MART, REGULATORY_RAW, and REGULATORY_STAGE, each connected to DOCUMENTATION_GUIDES database with corresponding schemas.
    The Storage Mappings in this example are the same as the schema.
    Best Practice

    Separating data into distinct Storage Locations helps maintain clear data lineage and simplifies compliance auditing.

    Connecting to Source Data

    Now we'll connect to the Snowflake sample data that simulates our enterprise transaction system.

    1. Click the + icon in the Build interface and select Add Sources.

    2. Navigate to the SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL database or a database with a sample of data.

    3. Select the following tables:

      • CUSTOMER - Customer data
      • STORE_SALES - Transaction records
    4. Click Add Sources to import the tables into your pipeline.

      This shows an Add Sources to SQL Pipeline dialog with a hierarchical view of database sources. Under REGULATORY_RAW, two items are selected: CUSTOMER and STORE_SALES_SAMPLE. The interface shows REGULATORY_MART, REGULATORY_RAW, and REGULATORY_STAGE databases with the option to add 2 sources.
      Add Sources

    Building the Customer Dimension

    To maintain a complete audit trail of customer changes for SOX compliance, we'll implement a Type 2 Slowly Changing Dimension.

    Create the Dimension Node

    1. Right-click the CUSTOMER source Node and select Add Node > Base Node Type Package > Dimension.

      This displays a data pipeline graph view with CUSTOMER and STORE_SALES_SAMPLE Nodes. A context menu shows options like Edit, Add Node, Copy, and Run Node, with submenus revealing Node types including Dimension, Fact, Stage, and Persistent Stage. The bottom shows customer data columns.
      Right-click on the Customer Node to add the Dimension Node.
    2. Rename the Node to DIM_CUSTOMER_MAIN.

    3. Configure the Node properties:

      • Storage Location: REGULATORY_STAGE
    This shows the DIM_CUSTOMER_MAIN dimension Node configuration in Coalesce. The mapping table displays customer columns with data types (NUMBER, VARCHAR). The right panel shows Storage Location as REGULATORY_STAGE, Node Type as Dimension, and Deploy Enabled. Status indicates Validated, Created, Data Loaded.
    Set the Storage Location to REGULATORY_STAGE

    Configure Change Tracking and Business Keys

    1. In the Options section, go to Business Key.

    2. Select the column:

      1. C_CUSTOMER_SK
    3. Go to Change Tracking and select the following columns:

      1. C_FIRST_NAME
      2. C_LAST_NAME
      3. C_EMAIL_ADDRESS
      4. C_PREFERRED_CUST_FLAG
      This shows the DIM_CUSTOMER_MAIN dimension configuration with expanded Business Key and Change Tracking panels on the right. Both sections display searchable lists of customer columns like C_CUSTOMER_ID, C_CUSTOMER_SK, and other customer attributes for configuration selection.

    Add Compliance Risk Assessment

    You'll add a compliance_risk_flag column. This risk flag helps identify customers requiring additional review based on missing contact information or preferred status. You can optionally add an audit_timestamp if required for compliance.

    1. In the Mapping Grid, scroll to the bottom and add a new column by clicking in the empty row.

    2. Configure the new column:

      • Column Name: compliance_risk_flag
      • Data Type: VARCHAR(10)
      • Transformation:
       CASE
      WHEN "CUSTOMER"."C_PREFERRED_CUST_FLAG" = 'Y' THEN 'GREEN'
      WHEN "CUSTOMER"."C_EMAIL_ADDRESS" IS NULL THEN 'YELLOW'
      ELSE 'GREEN'
      END
    3. Click Create to build the Dimension Node.

    This shows the Column Editor for DIM_CUSTOMER_MAIN with a compliance_risk_flag column being configured using a CASE statement. The SQL logic checks customer preferences and email addresses to assign GREEN or YELLOW risk flags. Two rows are highlighted: compliance_risk_flag and audit_timestamp.
    You can use the Column Editor tab to make it easy to create transformations

    Implementing Incremental Processing

    The STORE_SALES table contains billions of rows, making full table scans impractical. We'll implement an incremental loading pattern to process only new transactions.

    Understanding the Challenge

    Large Dataset Simulation

    The STORE_SALES table is massive (2.8+ billion rows) and static. To demonstrate incremental loading in this tutorial, we'll use a filtering pattern that simulates new data arriving over time.

    Create the Filtering View

    This view will control which data is visible to our incremental logic, simulating a real-world scenario where new data arrives continuously.

    1. Right-click the STORE_SALES source Node and select Add Node > Base Node Types Package > View.

    2. Rename the Node to STG_SALES_FILTERED.

    3. Set the Storage Location to REGULATORY_STAGE.

    4. In Options, enable Override Create SQL.

    5. In the Create SQL, update the script to include the following WHERE clause: WHERE "SS_SOLD_DATE_SK" < 2452000.

      STG_SALES_FILTERED Create SQL Full Example
       {{ stage('Override Create SQL') }}
      CREATE OR REPLACE VIEW {{ ref('REGULATORY_STAGE', 'STG_SALES_FILTERED')}} AS (
      SELECT
      "SS_SOLD_DATE_SK" AS "SS_SOLD_DATE_SK",
      "SS_SOLD_TIME_SK" AS "SS_SOLD_TIME_SK",
      "SS_ITEM_SK" AS "SS_ITEM_SK",
      "SS_CUSTOMER_SK" AS "SS_CUSTOMER_SK",
      "SS_CDEMO_SK" AS "SS_CDEMO_SK",
      "SS_HDEMO_SK" AS "SS_HDEMO_SK",
      "SS_ADDR_SK" AS "SS_ADDR_SK",
      "SS_STORE_SK" AS "SS_STORE_SK",
      "SS_PROMO_SK" AS "SS_PROMO_SK",
      "SS_TICKET_NUMBER" AS "SS_TICKET_NUMBER",
      "SS_QUANTITY" AS "SS_QUANTITY",
      "SS_WHOLESALE_COST" AS "SS_WHOLESALE_COST",
      "SS_LIST_PRICE" AS "SS_LIST_PRICE",
      "SS_SALES_PRICE" AS "SS_SALES_PRICE",
      "SS_EXT_DISCOUNT_AMT" AS "SS_EXT_DISCOUNT_AMT",
      "SS_EXT_SALES_PRICE" AS "SS_EXT_SALES_PRICE",
      "SS_EXT_WHOLESALE_COST" AS "SS_EXT_WHOLESALE_COST",
      "SS_EXT_LIST_PRICE" AS "SS_EXT_LIST_PRICE",
      "SS_EXT_TAX" AS "SS_EXT_TAX",
      "SS_COUPON_AMT" AS "SS_COUPON_AMT",
      "SS_NET_PAID" AS "SS_NET_PAID",
      "SS_NET_PAID_INC_TAX" AS "SS_NET_PAID_INC_TAX",
      "SS_NET_PROFIT" AS "SS_NET_PROFIT"
      FROM {{ ref('REGULATORY_RAW', 'STORE_SALES_SAMPLE') }}
      WHERE "SS_SOLD_DATE_SK" < 2452000
      )
    6. Click Create to build the view.

    Create the Incremental Logic

    Now we'll add the high-water mark logic that identifies new records.

    1. Right-click STG_SALES_FILTERED and select Add Node > Incremental Loading > Incremental Load.

    2. Rename the Node to INC_STORE_SALES_HWM.

    3. In the Options tab, configure:

      • Materialization: View
      • Filter data based on Persistent Table: True
      • Persistent Table Location: REGULATORY_STAGE
      • Persistent Table Name: PSTG_STORE_SALES_HWM
      • Incremental Load Column: SS_SOLD_DATE_SK
      Incremental load configuration screen
    4. Navigate to the Join tab and click Generate Join.

    5. Click Copy to Editor to populate the SQL.

    6. Verify the generated SQL. You'll need to update 1900-01-01 to 0.

      -- Find this line:
      SELECT COALESCE(MAX("SS_SOLD_DATE_SK"), '1900-01-01')

      -- Correct it to:
      SELECT COALESCE(MAX("SS_SOLD_DATE_SK"), '0')

      --Final SQL

      FROM{{ ref('REGULATORY_STAGE', 'STG_SALES_FILTERED') }} "STG_SALES_FILTERED"
      WHERE "STG_SALES_FILTERED"."SS_SOLD_DATE_SK" >
      (SELECT COALESCE(MAX("SS_SOLD_DATE_SK"), '0')
      FROM {{ ref_no_link('REGULATORY_STAGE', 'PSTG_STORE_SALES_HWM') }} )

    7. Create the View.

    Critical Fix

    The generated code may use a date string as the default value. Since SS_SOLD_DATE_SK is an integer, you must change this to 0 or the query will fail.

    Create the Persistent Staging Table

    This table stores the incrementally loaded transaction data.

    1. Right-click INC_STORE_SALES_HWM and select Add Node > Base Node Types Package > Persistent Stage.

    2. Rename the Node to PSTG_STORE_SALES_HWM.

    3. Configure the Node:

      • Storage Location: REGULATORY_STAGE
      • Business Key:
        • SS_SOLD_DATE_SK
        • SS_CUSTOMER_SK
        • SS_ITEM_SK
        • SS_TICKET_NUMBER

    Add Compliance Columns

    1. In the Mapping Grid, add two custom columns:

      Transaction Flag Column:

      • Column Name: transaction_flag
      • Data Type: VARCHAR(15)
      • Transformation:
      CASE
      WHEN "INC_STORE_SALES_HWM"."SS_NET_PAID" > 10000 THEN 'AML_REVIEW'
      ELSE 'NORMAL'
      END

      Audit Timestamp Column (optional):

      • Column Name: audit_processed_timestamp
      • Data Type: TIMESTAMP_NTZ
      • Transformation:
      CURRENT_TIMESTAMP()
      System Generated Columns

      You'll notice some columns that are automatically generated. These can be used for compliance or a custom column can be added such as audit_processed_timestamp. In this example, SYSTEM_UPDATE_DATE tracks the last modification to the record. Learn more about System Columns.

    2. Click Create to build the staging table.

    Running the Initial Load

    Let's perform the first run to load our partial dataset.

    1. In Snowflake, ensure the target table is empty:

      TRUNCATE TABLE YOUR_DB.REGULATORY_STAGE.PSTG_STORE_SALES_HWM;
    2. In Coalesce, click on PSTG_STORE_SALES_HWM and click Run.

    3. Monitor the run progress. This will load all records where SS_SOLD_DATE_SK < 2452000.

    Performance Check

    Note the execution time and row count. This establishes your baseline for comparing incremental load performance.

    Creating the Regulatory Fact Table

    This fact table combines transaction data with current customer information for compliance analysis.

    Build the Fact Node

    1. Right-click PSTG_STORE_SALES_HWM and select Add Node > Base Node Types Package > Fact.
    2. Rename the Node to FCT_STORE_SALES_SAMPLE.
    3. Configure the Node:
      • Storage Location: REGULATORY_MART
      • Business Key:
        • SS_SOLD_DATE_SK
        • SS_CUSTOMER_SK
        • SS_ITEM_SK
        • SS_TICKET_NUMBER

    Join Customer Dimension

    Navigate to the Join tab and enter:

    FROM {{ ref('REGULATORY_STAGE','PSTG_STORE_SALES_HWM') }} "PSTG_STORE_SALES_HWM"
    JOIN {{ ref('REGULATORY_STAGE','DIM_CUSTOMER_MAIN') }} "DIM_CUSTOMER_MAIN"
    ON "PSTG_STORE_SALES_HWM"."SS_CUSTOMER_SK" = "DIM_CUSTOMER_MAIN"."C_CUSTOMER_SK"
    AND "DIM_CUSTOMER_MAIN"."SYSTEM_END_DATE" = '2999-12-31 00:00:00'
    Current Records Only

    The JOIN condition on SYSTEM_END_DATE ensures we only use the current version of each customer record.

    Add Risk Assessment

    1. In the Mapping Grid, add a custom column:

      • Column Name: customer_risk_assessment
      • Data Type: VARCHAR(20)
      • Transformation:
       CASE
      WHEN "DIM_CUSTOMER_MAIN"."compliance_risk_flag" = 'YELLOW' AND "PSTG_STORE_SALES_HWM"."SS_NET_PAID" > 10000 THEN 'HIGH_RISK'
      WHEN "PSTG_STORE_SALES_HWM"."SS_NET_PAID" > 10000 THEN 'MEDIUM_RISK'
      ELSE 'LOW_RISK'
      END
    2. There are duplicate SYSTEM columns. Delete the SYSTEM_CREATE_DATE and the SYSTEM_UPDATE_DATE automatically created in the FCT_STORE_SALES_SAMPLE table by right-clicking and Delete Column. You are keeping the ones from the Node PSTG_STORE_SALES_HWM to keep the audit history. They would only track when they were inserted into the Fact table.

      This displays the FCT_STORE_SALES_SAMPLE fact table configuration with multiple tabs open. The mapping shows sales columns like SS_NET_PROFIT and system fields. Two red-highlighted rows indicate duplicate SYSTEM_CREATE_DATE and SYSTEM_UPDATE_DATE entries. Status shows Validated and Created.
      Delete the SYSTEM_CREATE_DATE and SYSTEM_UPDATE_DATE.
    3. Click Create and Run to build the Fact table.

    Build the Compliance Reports

    Customer 360 View

    Create a comprehensive customer profile with transaction history.

    1. Right-click DIM_CUSTOMER_MAIN and select Add Node > Base Node Types Packages > View.

    2. Rename to CUSTOMER_360_VIEW.

    3. Change the Storage Location to REGULATORY_MART.

    4. In the Join tab, add:

       FROM {{ ref('REGULATORY_STAGE','DIM_CUSTOMER_MAIN') }} "DIM_CUSTOMER_MAIN"
      LEFT JOIN {{ ref('REGULATORY_MART','FCT_STORE_SALES_SAMPLE') }} "FCT_STORE_SALES_SAMPLE"
      ON "DIM_CUSTOMER_MAIN"."C_CUSTOMER_SK" = "FCT_STORE_SALES_SAMPLE"."SS_CUSTOMER_SK"
      WHERE "DIM_CUSTOMER_MAIN"."SYSTEM_END_DATE" = '2999-12-31 00:00:00'
    5. In Options, toggle Group By All to True.

    6. Add aggregation columns with the transformations in the Mapping Grid:

      total_transactions (NUMBER)-  COUNT(FCT_STORE_SALES_SAMPLE.SS_TICKET_NUMBER)

      total_amount (NUMBER) - COALESCE(SUM(FCT_STORE_SALES_SAMPLE.SS_NET_PAID), 0)

      high_risk_transaction_count (VARCHAR(10)) - COUNT(CASE WHEN "FCT_STORE_SALES_SAMPLE"."customer_risk_assessment" = 'HIGH_RISK' THEN 1 END)
    7. Create the Node.

    Exception Detection View

    Monitor high-risk transactions requiring regulatory review.

    1. Right-click FCT_STORE_SALES_SAMPLE and select Add Node > Base Node Types Package > View.

    2. Rename to EXCEPTION_DETECTION_VIEW.

    3. Change the Storage Location to REGULATORY_MART.

    4. In the Join tab, add the WHERE clause:

       FROM {{ ref('REGULATORY_MART', 'FCT_STORE_SALES_SAMPLE') }} "FCT_STORE_SALES_SAMPLE"
      WHERE "FCT_STORE_SALES_SAMPLE"."customer_risk_assessment" = 'HIGH_RISK'
      OR "FCT_STORE_SALES_SAMPLE"."SS_NET_PAID" > 10000
    5. Create the Node.

    Monthly Regulatory Report

    Create an audit report snapshot for compliance reporting.

    1. Right-click FCT_STORE_SALES_SAMPLE and select Add Node > Base Node Types Packages > Fact.

    2. Rename to REG_MONTHLY_REPORTS.

    3. Delete all columns, and add the following columns.

      Column NameTransformationData Type
      report_dateDATE_TRUNC('MONTH', CURRENT_DATE())DATE
      customer_risk_category"FCT_STORE_SALES_SAMPLE"."customer_risk_assessment"VARCHAR(20)
      customer_countCOUNT(DISTINCT "FCT_STORE_SALES_SAMPLE"."SS_CUSTOMER_SK")NUMBER
      transaction_countCOUNT(*)NUMBER
      total_amountSUM("FCT_STORE_SALES_SAMPLE"."SS_NET_PAID")NUMBER(38,2)
      report_generated_timestampCURRENT_TIMESTAMP()TIMESTAMP_NTZ
    4. Configure the Business Key:

      • report_date
      • customer_risk_category
    5. In the Join tab:

       FROM {{ ref('REGULATORY_MART', 'FCT_STORE_SALES_SAMPLE') }} "FCT_STORE_SALES_SAMPLE"
      GROUP BY 1, 2
    6. Create the Node.

    Why a Fact Node?

    Regulatory reports must be static, historical snapshots. A Fact Node materializes the data, creating a permanent record for each reporting period. Views would show live data, which isn't suitable for audit purposes.

    Testing the Incremental Load

    Now let's simulate new data arriving and verify our incremental processing works correctly.

    Run the Pipeline

    1. Return to the STG_SALES_FILTERED view.

    2. Update the SQL to change the filter. This is to prevent all the rows being processed, but simulating new data.

       WHERE "SS_SOLD_DATE_SK" < 2452100
    3. Click Create.

    4. Go to the DAG and click Run All.

    5. Verify the new data appears in your reports by opening one of the reports and click on Fetch Data if the data isn't already loaded.

    This image shows the REG_MONTHLY_REPORTS_TEST Node in Coalesce, displaying the column mappings, transformations, and sample data. The data preview at the bottom includes risk-tiered customer metrics such as customer count, transaction count, and total amount for July 2025. The Fetch Data option is highlighted, indicating recent data retrieval.

    Conclusion and Next Steps

    Congratulations. You've built a production-ready compliance pipeline that demonstrates enterprise data engineering best practices.

    What You've Accomplished

    • Created an efficient incremental loading system
    • Implemented audit-compliant customer history tracking
    • Built automated compliance reporting
    • Established a scalable pipeline architecture

    Next Steps

    • Explore adding data quality checks using Coalesce's testing framework
    • Implement automated scheduling for regular report generation
    • Add additional compliance rules based on your organization's needs
    • Consider integrating with external compliance systems