Rittman Analytics

View Original

Deduplicating Company Records in a Multi-Source Data Centralization Project using dbt, Google BigQuery or Snowflake

One of the most common tasks in a data centralization project is to create single, deduplicated records for each of the companies, contacts, products and other entities the business interacts with.

Doing this allows you to connect sales activity from Salesforce and Hubspot to project delivery and invoicing data from Jira and Xero, for example:

cross-company.png

Doing this in-practice can however get pretty confusing and complicated quickly as a recent thread on the dbt Slack forum discussed. To summarise the complications this adds to a typical data centralization project:

  • Assuming you’re going to create a single record for each company, contact, product and so on, how do you store the source system IDs for those entities in such a way that you can link the incoming deals, orders, invoices and payments from those source systems to these new records?

  • What happens if not all of the sources of company data, for example, provide all of the fields that the main source provides? A CRM source such as HubSpot will typically provide dozens of data fields for each company whereas others may only provide a name and billing address

  • What if one source has the concept of a company or organisation in how it works but doesn’t provide a table of companies in its data extract; for example when you’re working with Jira as a data source and each project is named after one of your clients, but there’s only project and issue tables in the data export and no explicit “company” table

  • And what if two of the companies, contacts or products you need to deduplicate don’t naturally merge together on the name, code or other field you use for this purpose?

All of this is solvable with enough thinking, effort and coding but doing so whilst keeping your project agile, delivering fast and not becoming a spaghetti mess of hacked-together code can be a challenge. Here’s how we do this on our own client data centralization projects, using Google BigQuery or Snowflake as the data warehousing platform, dbt (“Data Build Tool”) and our open-source RA Warehouse for dbt framework available on Github as a public repo.

Step 1 : Standardise Entity Staging Data Sources

Customers, contacts, projects and other shared dimensions are automatically created from all data sources, deduplicating by name and merge lookup files using a process that preserves source system keys whilst assigning a unique ID for each customer, contact etc, as shown in the diagram below.

method.png

Each dbt staging module (a folder within a dbt project) provides a unique ID, prefixed with the source name, and another field value (for example, user name) that can be used for deduplicating dimension members downstream.

WITH source AS ({{ filter_stitch_relation(relation=var('stg_hubspot_crm_stitch_companies_table'),unique_column='companyid') }}),renamed AS (SELECTCONCAT('{{ var('stg_hubspot_crm_id-prefix') }}',companyid) AS company_id,REPLACE(REPLACE(REPLACE(properties.name.value, 'Limited', ''), 'ltd', ''),', Inc.','') AS company_name,properties.address.value AS company_address,properties.address2.value AS company_address2,properties.city.value AS company_city,properties.state.value AS company_state,properties.country.value AS company_country,properties.zip.value AS company_zip,properties.phone.value AS company_phone,properties.website.value AS company_website,properties.industry.value AS company_industry,properties.linkedin_company_page.value AS company_linkedin_company_page,properties.linkedinbio.value AS company_linkedin_bio,properties.twitterhandle.value AS company_twitterhandle,properties.description.value AS company_description,CAST (NULL AS STRING) AS company_finance_status,cast (null as string) as company_currency_code,properties.createdate.value AS company_created_date,properties.hs_lastmodifieddate.value company_last_modified_dateFROMsource)SELECT*FROMrenamed

As long as a data source can provide an ID and a field by which you can connect its records to the ones from the other source systems, you can just pass null values for the ones that are missing, like this:

WITH source AS ({{ filter_stitch_relation(relation=var('stg_jira_projects_stitch_projects_table'),unique_column='id') }}),renamed as (select * from (SELECTconcat('{{ var('stg_jira_projects_id-prefix') }}',replace(name,' ','_')) AS company_id,name AS company_name,cast (null as string) as company_address,cast (null as string) AS company_address2,cast (null as string) AS company_city,cast (null as string) AS company_state,cast (null as string) AS company_country,cast (null as string) AS company_zip,cast (null as string) AS company_phone,cast (null as string) AS company_website,cast (null as string) AS company_industry,cast (null as string) AS company_linkedin_company_page,cast (null as string) AS company_linkedin_bio,cast (null as string) AS company_twitterhandle,cast (null as string) AS company_description,cast (null as string) as company_finance_status,cast (null as string) as company_currency_code,cast (null as timestamp) as company_created_date,cast (null as timestamp) as company_last_modified_dateFROM source ){{ dbt_utils.group_by(n=19) }})select * from renamed

If you’re wondering what {{ filter_stitch_relation … }} is referring to, it’s a dbt macro we use to filter out the historic (append-only) version of rows you get when using Stitch to replicate SaaS data sources into Google BigQuery:

{%- macro filter_stitch_relation(relation, unique_column) -%}SELECT*FROM(SELECT*,MAX(_sdc_batched_at) OVER (PARTITION BY {{ unique_column }} ORDER BY _sdc_batched_at RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_sdc_batched_atFROM{{ relation }})WHERE_sdc_batched_at = max_sdc_batched_at{%- endmacro -%}

Step 2 : Union Staging Data Sources Together

A variable we’ve created within the dbt_project.yml lists of all the staging data sources that provide company records, named in a standard way so that if we prepend “stg” and append “_staging” to the start of the data source name it equates to the name of the module (folder) within the dbt project.

crm_warehouse_company_sources: ['hubspot_crm','harvest_projects','xero_accounting','stripe_payments','asana_projects','jira_projects'

The “Integration” module for company data sources then uses this list of sources provided by the variable to drive a Jinja “for” loop that unions these sources together into a single CTE (common table expression)

with t_companies_pre_merged as ({% for source in var('crm_warehouse_company_sources') %}{% set relation_source = 'stg_' + source + '_companies' %}select'{{source}}' as source,*from {{ ref(relation_source) }}{% if not loop.last %}union all{% endif %}{% endfor %})

Step 3 : Group and Deduplicate on a Common Field

This unioned-but-not-yet-deduplicated set of all company records from all sources is then grouped (deduplicated) on a field that contains the same value for the company across all the different sources, such as the company name.

grouped as (SELECTcompany_name,max(company_phone) as company_phone,max(company_website) as company_website,max(company_industry) as company_industry,max(company_linkedin_company_page) as company_linkedin_company_page,max(company_linkedin_bio) as company_linkedin_bio,max(company_twitterhandle) as company_twitterhandle,max(company_description) as company_description,max(company_finance_status) as company_finance_status,max(company_currency_code) as company_currency_code,min(company_created_date) as company_created_date,max(company_last_modified_date) as company_last_modified_datefrom t_companies_pre_mergedgroup by 1

Step 4 : Store IDs and Multi-Value Fields in Arrays

For fields where we’d need to record a number of values for each company, for example when recording the various source system IDs we’ll need to have available when linking invoices coming from Xero, deals from Hubspot and payments from Stripe, we make use of repeating columns when working with Google BigQuery target data warehouses, like this:

all_company_ids as (SELECT company_name, array_agg(distinct company_id ignore nulls) as all_company_idsFROM t_companies_pre_mergedgroup by 1),

and repeating nested columns (aka structs) when storing multi-field arrays of related values, for example when storing the components of an address:

all_company_addresses as (SELECT company_name, array_agg(struct(company_address,company_address2,company_city,company_state,company_country,company_zip)ignore nulls) as all_company_addresses

If the target warehouse is Snowflake then we’d achieve the same result by using a VARIANT data type to store the repeating fields as JSON.

all_company_ids as (SELECT company_name,array_agg(distinct company_id) as all_company_idsFROM t_companies_pre_mergedgroup by 1),all_company_addresses as (SELECT company_name,array_agg(parse_json (concat('{"company_address":"',company_address,'", "company_address2":"',company_address2,'", "company_city":"',company_city,'", "company_state":"',company_state,'", "company_country":"',company_country,'", "company_zip":"',company_zip,'"} '))) as all_company_addresses

Step 5: Use a Lookup File for Manual Entity Merging

For dimensions where merging of members by name is not sufficient (for example, company names that cannot be relied on to always be spelt the same across all sources) we add a seed file to provide manual lookups to map one member to another via their source system IDs, for example:

company_id,old_company_idhubspot-3423423,xero-123121hubspot-2412121,stripe-214122xero-123121,salesforce-12312412

and then extend the logic of the previous grouping step to make use of this merge file, for example when BigQuery is the target warehouse:

from companies_pre_merged cleft outer join (select company_name,ARRAY(SELECT DISTINCT xFROM UNNEST(all_company_ids) AS x) as all_company_idsfrom (select company_name, array_concat_agg(all_company_ids) as all_company_idsfrom (select * from (selectc2.company_name as company_name,c2.all_company_ids as all_company_idsfrom {{ ref('companies_merge_list') }} mjoin companies_pre_merged c1 on m.old_company_id in UNNEST(c1.all_company_ids)join companies_pre_merged c2 on m.company_id in UNNEST(c2.all_company_ids))union allselect * from (selectc2.company_name as company_name,c1.all_company_ids as all_company_idsfrom {{ ref('companies_merge_list') }} mjoin companies_pre_merged c1 on m.old_company_id in UNNEST(c1.all_company_ids)join companies_pre_merged c2 on m.company_id in UNNEST(c2.all_company_ids)))group by 1)) mon c.company_name = m.company_namewhere c.company_name not in (selectc2.company_namefrom {{ ref('companies_merge_list') }} mjoin companies_pre_merged c2 on m.old_company_id in UNNEST(c2.all_company_ids)))

The process is the same for Snowflake data warehouse targets albeit with slightly different SQL to reflect the different approach to repeating nested columns that this platform uses

left outer join (select company_name, array_agg(all_company_ids) as all_company_idsfrom (selectc2.company_name as company_name,c2.all_company_ids as all_company_idsfrom {{ ref('companies_merge_list') }} mjoin (SELECT c1.company_name, c1f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c1,table(flatten(c1.all_company_ids)) c1f) c1on m.old_company_id = c1.all_company_idsjoin (SELECT c2.company_name, c2f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c2,table(flatten(c2.all_company_ids)) c2f) c2on m.company_id = c2.all_company_idsunion allselectc2.company_name as company_name,c1.all_company_ids as all_company_idsfrom {{ ref('companies_merge_list') }} mjoin (SELECT c1.company_name, c1f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c1,table(flatten(c1.all_company_ids)) c1f) c1on m.old_company_id = c1.all_company_idsjoin (SELECT c2.company_name, c2f.value::string as all_company_ids from {{ ref('int_companies_pre_merged') }} c2,table(flatten(c2.all_company_ids)) c2f) c2on m.company_id = c2.all_company_ids)group by 1) mon c.company_name = m.company_namewhere c.company_name not in (selectc2.company_namefrom {{ ref('companies_merge_list') }} mjoin (SELECT c2.company_name, c2f.value::string as all_company_idsfrom {{ ref('int_companies_pre_merged') }} c2,table(flatten(c2.all_company_ids)) c2f) c2on m.old_company_id = c2.all_company_ids)

The deduplicated and grouped list of companies together with their multi-valued source IDs fields and other field groups, suitably manually and additionally merged using our manual merge lookup file are then joined together ready for loading into the companies warehouse dimension table.

SELECT i.all_user_ids,u.*,e.all_user_emailsFROM (SELECT user_name,MAX(contact_is_contractor) as contact_is_contractor,MAX(contact_is_staff) as contact_is_staff,MAX(contact_weekly_capacity) as contact_weekly_capacity ,MAX(user_phone) as user_phone,MAX(contact_default_hourly_rate) as contact_default_hourly_rate,MAX(contact_cost_rate) as contact_cost_rate,MAX(contact_is_active) as contact_is_active,MAX(user_created_ts) as user_created_ts,MAX(user_last_modified_ts) as user_last_modified_ts,FROM t_users_merge_listGROUP BY 1) uJOIN user_emails eON u.user_name = COALESCE(e.user_name,'Unknown')JOIN user_ids iON u.user_name = i.user_name

Step 6: Generate Surrogate key for Dimension Tab;e

Then coming along at the last minute but taking all the glory, the warehouse dimension table model step at the end adds a unique identifier for the company record using the dbt_utils.surrogate_key macro.

WITH companies_dim as (SELECT{{ dbt_utils.surrogate_key(['company_name']) }} as company_pk,*FROM{{ ref('int_companies') }} c)select * from companies_dim

Step 7: Un-nest the Source ID Arrays for Fact Joins

Finally, when populating the fact tables that join to this dimension you’ll need a way to join to those repeating and variant data type columns that contain the array of source system IDs for each company, as shown in the screenshot below for Google BigQuery target warehouses.

array.png

To do this when working with BigQuery as the target dimension we use the UNNEST() Standard SQL function to flatten those nested values and then perform the required fact table source to dimension table join, like this example for our delivery projects fact table:

WITH delivery_projects AS(SELECT *FROM {{ ref('int_delivery_projects') }}),companies_dim as (SELECT {{ dbt_utils.star(from=ref('wh_companies_dim')) }}from {{ ref('wh_companies_dim') }})SELECT...FROMdelivery_projects pJOIN companies_dim cON p.company_id IN UNNEST(c.all_company_ids)

Snowflake performs the flatten of the repeated source system ID field using its own syntax, so that the equivalent SQL looks like this:

companies_dim as (SELECT c.company_pk, cf.value::string as company_idfrom {{ ref('wh_companies_dim') }} c,table(flatten(c.all_company_ids)) cf)SELECT...FROMdelivery_projects pJOIN companies_dim cON p.company_id = c.company_id

Interested? Find out More

You can read more about our work with dbt, Google BigQuery, Snowflake and other modern data stack technologies on our website and blog:

How we handle deduplication and merging of company and contact data is just one of the design patterns we’ve included in our RA Warehouse for dbt framework that we’ve made available as free to use, open-source code in a public repo on our Github site.

We’re a dbt Preferred Consulting Partner and this is one of the many ways we try and contribute value back to the dbt ecosystem, and we welcome issue reports, pull requests and suggestions on other problems encountered on data centralization projects that it’d be cool for us to solve using this framework.

Or are you interested but don’t have the time to do it yourself? No problem! Click here to book a 100% free, no-obligation 30 minute call to discuss your data needs and how we could help get your data centralization initiative moving now.