The ‘e’ in Hex is for ELT

quick note: the justification for doing this is worth like a 17 page manifesto. I’m focusing on the how, and maybe I’ll eventually write the manifesto.

General Approach

This specific problem is loading Point-of-Sale data for a vertical specific system into a database for analysis on a daily basis, but could be generalized to most small/medium data use cases where ~24 hour latency is totally fine.

The ELT pipeline uses Hex Notebooks and dbt jobs, both orchestrated independently with crons. dbt is responsible for creating all tables and handling grants as well as data transformation, while Hex handles extract and load from a set of REST APIs into the database. Hex loads into a “queue” of sorts – simply a table in Snowflake that can take JSON pages and some metadata. Conceptually, it looks like this.

Loading data with Hex

Since Hex is a python notebook running inside of managed infrastructure, we can skip the nonsense of environment management, VMs, orchestration, and so on and just get to loading data. First things first, lets add the snowflake connector to our environment.

Bash
!pip3 install snowflake-connector-python

Now that we have added that package our environment, we can build our python functions. I’ve added some simple documentation below.

Python
import requests
import os
import json
import snowflake.connector
from snowflake.connector.errors import ProgrammingError
from datetime import datetime

# login to snowflake
def snowflake_login():
    connection = snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        account=SNOWFLAKE_ACCOUNT,
        database=os.getenv('SNOWFLAKE_DATABASE'),
        schema=os.getenv('SNOWFLAKE_SCHEMA'),
        warehouse=os.getenv('SNOWFLAKE_WAREHOUSE'),
    )

    # print the database and schema
    print(f"Connected to database '{os.getenv('SNOWFLAKE_DATABASE')}' and schema '{os.getenv('SNOWFLAKE_SCHEMA')}'")

    return connection

# get the last run date for a specific endpoint and store from snowflake
def last_run_date(conn, table_name, store_name):
    cur = conn.cursor()
    try:
        # Endpoints take UTC time zone
        print(f"SELECT MAX(UPDATED_AT) FROM PROD_PREP.{table_name} WHERE store_name = '{store_name}';")
        query = f"SELECT MAX(UPDATED_AT) FROM PROD_PREP.{table_name} WHERE store_name = '{store_name}'"
        cur.execute(query)
        result = cur.fetchone()[0]
        try:
            result_date = datetime.strptime(str(result).strip("(),'"), '%Y-%m-%d %H:%M:%S').date()
        except ValueError:
            # handle the case when result is None or not in the expected format
            try:
                result_date = datetime.strptime(str(result).strip("(),'"), '%Y-%m-%d %H:%M:%S.%f').date()
            except ValueError:
                print(f"error: Cannot handle datetime format. Triggering full refresh.")
                result_date = '1900-01-01'
    except ProgrammingError as e:
        if e.errno == 2003:
            print(f'error: Table {table_name} does not exist in Snowflake. Triggering full refresh.')
            # this will trigger a full refresh if there is an error, so be careful here
            result_date = '1900-01-01'
        else:
            raise e
    cur.close()
    conn.close()
    return result_date

# Request pages, only return total page number
def get_num_pages(api_endpoint,auth_token,as_of_date):
    header = {'Authorization': auth_token}
    total_pages = requests.get(api_endpoint+'?page=1&q[updated_at_gt]='+str(as_of_date),headers=header).json()['total_pages']
    return total_pages

# Returns a specific page given a specific "as of" date and page number
def get_page(api_endpoint,auth_token,as_of_date,page_num):
    header = {'Authorization': auth_token}
    print(f"loading data from endpoint: {api_endpoint}" )
    page = requests.get(api_endpoint+'?page='+str(page_num)+'&q[updated_at_gt]='+str(as_of_date),headers=header).json()
    return page

# Loads data into snowflake
def load_to_snowflake(store_name, source_api, api_key, updated_date, total_pages, conn, stage_table, json_element):
    cur = conn.cursor()
    create_query = f"CREATE TABLE IF NOT EXISTS {stage_table} ( store_name VARCHAR , elt_date TIMESTAMPTZ, data VARIANT)"
    cur.execute(create_query)
    
    # loop through the pages
    for page_number in range(1,total_pages+1,1):
        response_json = get_page(source_api,api_key,updated_date,page_number)
        raw_json = response_json[json_element]
        raw_data = json.dumps(raw_json)
        # some fields need to be escaped for single quotes
        clean_data = raw_data.replace('\\', '\\\\').replace("'", "\\'")
        cur.execute(f"INSERT INTO {stage_table} (store_name, elt_date, data) SELECT '{store_name}', CURRENT_TIMESTAMP , PARSE_JSON('{clean_data}')")
        print(f"loaded {page_number} of {total_pages}")
    
    cur.close()
    conn.close()

# create a wrapper for previous functions so we can invoke a single statement for a given API
def job_wrapper(store_name, api_path, api_key, target_table, target_table_key):
    # get the updated date for a specific table
    updated_date = last_run_date(snowflake_login(), target_table, store_name)
    print(f"The maximum value in the 'updated_at' column of the {target_table} table is: {updated_date}")

    # get the number of pages based on the updated date
    pages = get_num_pages(api_path,api_key,updated_date)
    print(f"There are {pages} pages to load in the sales API")

    # load to snowflake
    load_to_snowflake(store_name, api_path, api_key,updated_date,pages,snowflake_login(),target_table, target_table_key)

Now that we have our python in place, we can invoke a specific API. It should be noted that Hex also has built-in environmental variable management, so we can keep our keys safe while still having a nice development & production flow.

Python
job_wrapper('store_name','api_url',AUBURN_API_KEY,'end_point_name','endpoint_unique_key')

To deploy this for more endpoints, simply update the api_url, end_point_name, and endpoint_unique_id. You can also hold it in a python dict and reference it as a variable, but I found that to be annoying when troubleshooting.

The last step in Hex is to publish the notebook so that you can set a cron job on it – I set mine to run at midnight PST.

Transforming in dbt

I am using on-run-start & on-run-end scripts in my dbt project to frame out the database, in my case, Snowflake.

SQL
on-run-start:
  - CREATE TABLE IF NOT EXISTS STAGING.sales_histories ( store_name VARCHAR , elt_date TIMESTAMPTZ, data VARIANT, id INT) ;

Now that data is in snowflake (in the RAW schema), we can use a macro in dbt to handle our transformation from pages coming from the API to rows in a database. But first we need to define our sources (the tables built in the on-run-start step) in YAML.

YAML
version: 2

sources:
  - name: SOURCE_NAME
    database: DWH_V2
    schema: STAGING
    tables:
      - name: sales_histories

Repeat for each API end point that you want to stage in your database.

Now consider the following model which transforms the JSON pages to rows:

SQL
{{ config(pre_hook="{{ merge_queues( 'sales_histories' , 'STAGING','ticketId' ) }}") }}

select 
    *,
    data:updated_at::datetime as updated_at
from {{ source( 'POSABIT', 'sales_histories' ) }}

Of course, the real magic here is in the “merge_queues” macro, which is below:

SQL
{% macro merge_queues( table_name, schema, unique_id )%}
    MERGE INTO {{schema}}.{{table_name}} t
        USING (
            with cte_top_level as (
            -- we can get some duplicate records when transaction happen as the API runs
            -- as a result, we want to take the latest date in the elt_date column
            -- this used to be a group by, and now is qualify
                select
                    store_name,
                    elt_date,
                    value as val,
                    val:{{unique_id}} as id
                from RAW.{{table_name}},
                lateral flatten( input => data )
                QUALIFY ROW_NUMBER() OVER (PARTITION BY store_name, id ORDER BY elt_date desc) = 1
            )
            select 
                *
            from cte_top_level
        ) s
        ON t.id = s.id AND t.store_name = s.store_name
        -- need to handle updates if they come in
        WHEN MATCHED THEN
            UPDATE SET t.store_name = s.store_name,
                t.elt_date = s.elt_date,
                t.data = s.val,
                t.id = s.id
        WHEN NOT MATCHED THEN
            INSERT ( store_name, elt_date, data, id)
            VALUES ( s.store_name, s.elt_date, s.val, s.id);

    -- truncate the queue
    TRUNCATE RAW.{{table_name}};
{% endmacro %}

A key note here is that snowflake does not handle MERGE like an OLTP database, so we need to de-duplicate it before we INSERT or UPDATE. I learned this the hard way by trying to de-dupe once the data was into my staging table, but annoyingly this is not easy in snowflake! So I had to truncate and try again a few times.

Now that the data is in a nice tabular format, we can run it like a typical dbt project.

Let me know if you have any questions or comments – you can find me on twitter @matsonj

Other notes

There are lots of neat features that I didn’t end up implementing. A noncomprehensive list is below:

  • Source control + CI/CD for the Hex notebooks – the Hex flow is so simple that I didn’t feel this was necessary.
  • Hex components to reduce repetition of code – today, every store gets its own notebook.
  • Using mdsinabox patterns with DuckDB instead of Snowflake – although part of the reason to do this was to defer infrastructure to bundled vendors.

Leave a Reply