Incremental models

Incremental models

Created as tables in your data warehouse, incremental models initially transform all your source data rows. On subsequent runs, incremental models target specific rows based on a user-provided filter, usually pertaining to newly added or updated rows.

The incremental loading mechanism significantly reduces transformation runtime and compute costs, thereby leading to enhanced warehouse performance.

Building incremental models

Building an incremental model follows the same initial process as building a model that is materialized as a table or view. However, you will need to also include both the incremental configuration and the filtering criteria.

Here's an example to illustrate the structure of an incremental model:

Configure the model as incremental.

Define the materialization type as part of the config block.

events.sql

_18
{{
_18
config(
_18
materialized='incremental'
_18
)
_18
}}
_18
_18
select
_18
*,
_18
my_slow_function(my_column)
_18
_18
from raw_app_data.events
_18
_18
{% if is_incremental() %}
_18
_18
-- this filter will only be applied on an incremental run
_18
where event_time > (select max(event_time) from {{ this }})
_18
_18
{% endif %}

Define the model's logic.

Provide the SQL logic that will act as the base for your incremental model.

events.sql

_18
{{
_18
config(
_18
materialized='incremental'
_18
)
_18
}}
_18
_18
select
_18
*,
_18
my_slow_function(my_column)
_18
_18
from raw_app_data.events
_18
_18
{% if is_incremental() %}
_18
_18
-- this filter will only be applied on an incremental run
_18
where event_time > (select max(event_time) from {{ this }})
_18
_18
{% endif %}

Utilize the is_incremental macro to filter out rows.

Use the is_incremental() macro to filter for "new" rows, which are those added since the last model run. Use the {{ this }} variable to easily query your target table for the latest timestamp.

events.sql

_18
{{
_18
config(
_18
materialized='incremental'
_18
)
_18
}}
_18
_18
select
_18
*,
_18
my_slow_function(my_column)
_18
_18
from raw_app_data.events
_18
_18
{% if is_incremental() %}
_18
_18
-- this filter will only be applied on an incremental run
_18
where event_time > (select max(event_time) from {{ this }})
_18
_18
{% endif %}

Set an optional unique_key to identify rows that need to be updated.

The unique_key parameter is optional. When set, Y42 merges the new data into the zero-copy-clone of the existing table, allowing for updates to existing rows.

If the unique_key is not specified, Y42 will simply append the data to the end of the zero-copy-clone of the existing table, allowing the addition of new rows only.

events.sql

_19
{{
_19
config(
_19
materialized='incremental',
_19
unique_key='id'
_19
)
_19
}}
_19
_19
select
_19
*,
_19
my_slow_function(my_column)
_19
_19
from raw_app_data.events
_19
_19
{% if is_incremental() %}
_19
_19
-- this filter will only be applied on an incremental run
_19
where event_time > (select max(event_time) from {{ this }})
_19
_19
{% endif %}

Configure the model as incremental.

Define the materialization type as part of the config block.

Define the model's logic.

Provide the SQL logic that will act as the base for your incremental model.

Utilize the is_incremental macro to filter out rows.

Use the is_incremental() macro to filter for "new" rows, which are those added since the last model run. Use the {{ this }} variable to easily query your target table for the latest timestamp.

Set an optional unique_key to identify rows that need to be updated.

The unique_key parameter is optional. When set, Y42 merges the new data into the zero-copy-clone of the existing table, allowing for updates to existing rows.

If the unique_key is not specified, Y42 will simply append the data to the end of the zero-copy-clone of the existing table, allowing the addition of new rows only.

events.sql

_18
{{
_18
config(
_18
materialized='incremental'
_18
)
_18
}}
_18
_18
select
_18
*,
_18
my_slow_function(my_column)
_18
_18
from raw_app_data.events
_18
_18
{% if is_incremental() %}
_18
_18
-- this filter will only be applied on an incremental run
_18
where event_time > (select max(event_time) from {{ this }})
_18
_18
{% endif %}

Merge strategy

Y42 supports various strategies for implementing incremental models, including the merge incremental strategy. This approach allows for appending new rows or updating existing ones based on the unique_key configuration, mirroring the principles of Slowly Changing Dimension Type 1 (SCD1). This strategy ensures new data is overwritting old data when the key(s) match(es), while new data is appended, ideal for scenarios where maintaining historical data variations is not required.

You can specify the incremental_strategy configuration for individual models or globally for all models in your dbt_project.yml file:

dbt_project.yml

_10
models:
_10
+incremental_strategy: "merge"

or:

models/customers.sql

_11
{{
_11
config(
_11
materialized='incremental',
_11
unique_key='id',
_11
incremental_strategy='merge',
_11
...
_11
)
_11
}}
_11
_11
select ...
_11
from

The above configuration translates into a SQL MERGE statement as follows:

compiled_query.sql

_13
merge into <your_destination_model> as dbt_internal_dest
_13
using (
_13
select ..
_13
from
_13
) as dbt_internal_source
_13
on (dbt_internal_source.id = dbt_internal_dest.id)
_13
when matched then update
_13
set dbt_internal_dest.col1 = dbt_internal_source.col1,
_13
dbt_internal_dest.col2 = dbt_internal_source.col2,
_13
..
_13
when not matched by target then
_13
insert into <your_destination_model>
_13
values (..)

Y42 vs. dbt incremental models execution

In Y42, the tables in your data warehouse (DWH) are synchronized with your code. This abstraction eliminates concerns about data loss due to errors overwriting a table in your DWH. Unlike dbt, expensive full-refreshes are not necessary. If a job runs with incorrect configurations, erroneous queries, or source issues, you can revert your changes and restore the model's data to its previous state.

In contrast, each dbt run overwrites your existing DWH table. If the table was incrementally built, overwriting it with incorrect data can be costly. You would have to perform a full-refresh, which undermines the efficiency of incremental models.

BigQuery limitations

In BigQuery, tables are not partitioned or clustered by default. This means that even if you use a WHERE statement to filter out data, the entire table is scanned. Consequently, incremental models like the following will read the whole source table (e.g., raw_app_data.events) with every run:

events.sql

_18
{{
_18
config(
_18
materialized='incremental'
_18
)
_18
}}
_18
_18
select
_18
*,
_18
my_slow_function(my_column)
_18
_18
from raw_app_data.events
_18
_18
{% if is_incremental() %}
_18
_18
-- this filter will only be applied on an incremental run
_18
where event_time > (select max(event_time) from {{ this }})
_18
_18
{% endif %}

Therefore, the incremental model does not save any money, although it does save time because any filter or limit reduces execution time in BigQuery.

Incremental Predicates

incremental_predicates offer an advanced approach for managing large-volume data in incremental models, justifying further performance optimization efforts. This configuration accepts a list of valid SQL expressions. Note that Y42 does not verify the syntax of these SQL statements.

my_incremental_model
compiled_query

_12
{{
_12
config(
_12
materialized = 'incremental',
_12
unique_key = 'id',
_12
cluster_by = ['session_start'],
_12
incremental_strategy = 'merge',
_12
incremental_predicates = [
_12
"target_alias.session_start > dateadd(day, -7, current_date)"
_12
]
_12
)
_12
}}
_12
..

FAQ

What happens if there was a mistake and I have to roll back my incremental model?

This isn't a problem, as we work with zero-copy-clone for incremental models. This means every incremental job gets a new table. If the incremental job fails your tests or the incrementally updated data is incorrect, you can roll back as you would with other Y42 models and use your correct data again. There's no need to worry about mistakes in code or source data messing up your downstream dependencies or stakeholder-facing tables. Additionally, you don't need to run an expensive full-refresh in case of mistakes.

What happens when the schema of my upstream model changes?

At the moment, we perform a full-refresh in this case. In the future, we plan to allow more fine-grained control from the user side.

When do we trigger a full-refresh instead of an incremental job?

Currently, when an upstream dependency changes or something in the incremental model changes, we trigger a full-refresh. In the future, we will allow more fine-grained control within configs.

How do I partition a model in Y42?

You cannot today. We will enable this feature soon.