Simplifying Custom Batch Aggregations with Incremental Backfills
If you need to define your own aggregation features in Tecton when the built-in
aggregations don't meet your requirements, you can streamline the process by
configuring incremental backfills with the incremental_backfills=True
option.
With incremental backfills enabled, Tecton will execute your query for each
batch_schedule
interval between the specified feature_start_time
and the
time when the feature is registered. Instead of running the query once on the
entire raw data set, it will be executed separately for each time window within
the specified range. This ensures that each query generates a single aggregation
for that particular time window.
To illustrate this with an example, let's say you register a feature view on May
1, 2023. You set the feature_start_time
as January 1, 2023, indicating that
you want to backfill the feature view all the way back to the beginning of the
year. Additionally, you define the feature's batch_schedule
as "1d,"
instructing Tecton to run materialization jobs on a daily basis. Consequently,
Tecton will perform a backfill query for your feature view each day for the time
period between January 1, 2023, and May 1, 2023 (excluding the end date). This
timeframe covers 120 days, resulting in a total of 120 backfill queries being
executed.
In Spark, Tecton usually runs backfill queries by grouping them into 10
concurrent jobs. You can control the number of queries included in each backfill
job by adjusting the
max_batch_aggregation_interval
variable in your batch_feature_view
configuration. This allows you to
customize the size of the query batches according to your needs.
However, on Snowflake, Tecton executes single query per backfill job.
To determine the number of materialization jobs that will be created when you
apply your changes, we recommend checking the output of the tecton plan
command. This will provide you with valuable insights into the job count and
help you ensure that everything is configured as expected.
Because of the need for many jobs, Batch Feature Views with incremental backfills cannot compute features on the fly from the source and, instead, must be materialized first prior to requesting training data.
Example​
Below are two examples of how to implement a Batch Feature View for a count
distinct merchant over a seven day period. The first example uses
FilteredSource
to accomplish this, the second example uses the
materialization_context
.
In both examples, the timestamps of the features are set to the end of the
materialization window:
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND
. This is done
since Tecton will filter the output of the feature view transformation by
[context.start_time, context.end_time)
.
Filtered Source
In this example, we use a FilteredSource
with a -6 days start_time_offset to
achieve our seven day aggregate. The materialization window is one day long,
since the batch_schedule
is one day. Offsetting the start time by -6 days
means that the total time window of data returned by the data source for a
single materialization query is 7 days long.
- Spark
- Snowflake
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode="spark_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D
FROM
{transactions}
GROUP by USER_ID
"""
from tecton import batch_feature_view, FilteredSource
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions, start_time_offset=timedelta(days=-6))],
entities=[user],
mode="snowflake_sql",
online=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
offline=True,
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP('{context.end_time}') - INTERVAL '1 MICROSECOND' as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D
FROM
{transactions}
GROUP by USER_ID
"""
Materialization Context
In this example, we use the materialization context (i.e. the context
parameter in our feature view function) to filter for the right time window of
raw data needed for each materialization query. That data can then be aggregated
to produce the feature values for that time period.
- Spark
- Snowflake
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP("{context.end_time}") - INTERVAL 1 MICROSECOND as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP("{context.start_time}") - INTERVAL 6 DAYS
AND TIMESTAMP < TO_TIMESTAMP("{context.end_time}")
GROUP by USER_ID
"""
from tecton import batch_feature_view, materialization_context
from datetime import datetime, timedelta
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="snowflake_sql",
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
ttl=timedelta(days=1),
incremental_backfills=True,
)
def user_distinct_merchant_transaction_count_7d(transactions, context=materialization_context()):
return f"""
SELECT
USER_ID,
TO_TIMESTAMP('{context.end_time}') - INTERVAL '1 MICROSECOND' as TIMESTAMP,
COUNT(DISTINCT MERCHANT) AS DISTINCT_MERCHANT_COUNT_7D,
FROM
{transactions}
WHERE TIMESTAMP >= TO_TIMESTAMP('{context.start_time}') - INTERVAL '6 DAYS'
AND TIMESTAMP < TO_TIMESTAMP('{context.end_time}')
GROUP by USER_ID
"""
Performance Considerations​
While incremental backfills will simplify your transformation logic, it may also result in many more backfill queries which can become inefficient at scale.
Leveraging time filters on your data source is especially important to ensure
that each query operates only over the data that it needs. Although you can
accomplish this with your own filter logic within the feature view using the
materialization_context
, Tecton recommends that you leverage
FilteredSource.
Using a FilteredSource ensures that data sources are filtered correctly and any
configured partitions are used for improved filter performance.