Manually Trigger Materialization
Background​
By default, feature materialization runs automatically, on a schedule configured in the Feature View definition.
Batch & Stream Feature Views also support manually-triggered materialization via an API call.
Manual materialization can be triggered through the following methods:
- The Tecton SDK
- The Tecton Airflow provider
This page explains how to use the Tecton SDK to manually trigger materialization.
To use the Tecton Airflow provider instead, see the README file in the provider repo.
Disabling scheduled batch jobs for a Feature View​
When batch_trigger=BatchTriggerType.MANUAL
is set in the feature view, Tecton
normally will not schedule any batch materialization jobs for the Feature View.
Batch materialization will only be possible through manually triggering jobs
with the Tecton SDK or Tecton Airflow Provider.
batch_schedule
or aggregation_interval
must still be defined for
materialization, as Tecton partitions data based on this interval.
For a
StreamFeatureView, only
batch materialization job scheduling will be impacted by the batch_trigger
setting. Streaming materialization job scheduling will still be managed by
Tecton.
If a Data Source input to the Feature View has data_delay
set, then that delay
will still be factored in to constructing training data sets but does not impact
when the job can be triggered with the materialization API.
Example of a Feature View configured for manual materialization​
- Rift
- Spark
from tecton import batch_feature_view, FilteredSource, Aggregation, BatchTriggerType, TimeWindow
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="pandas",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
online=False,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
batch_trigger=BatchTriggerType.MANUAL, # Use manual triggers
)
def user_transaction_counts(transactions):
transactions["transaction"] = 1
return transactions[["user_id", "transaction", "timestamp"]]
from tecton import batch_feature_view, FilteredSource, Aggregation, BatchTriggerType, TimeWindow
from fraud.entities import user
from fraud.data_sources.transactions import transactions_batch
from datetime import datetime, timedelta
@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=1))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=30))),
Aggregation(column="transaction", function="count", time_window=TimeWindow(window_size=timedelta(days=90))),
],
online=False,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="User transaction totals over a series of time windows, updated daily.",
batch_trigger=BatchTriggerType.MANUAL, # Use manual triggers
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
Automatic Backfill for Manually-Materialized Feature Views​
If a Feature View has manual_trigger_backfill_end_time
set, Tecton will
automatically create jobs to backfill data between feature_start_time
and
manual_trigger_backfill_end_time
.
If changes are made to feature_start_time
or
manual_trigger_backfill_end_time
, Tecton will intelligently schedule jobs to
backfill only unmaterialized data (instead of fully rematerializing the entire
Feature View).
Tecton SDK methods for triggering, monitoring and canceling materialization​
In the Tecton SDK, the Feature View interactive classes have methods for triggering, monitoring and canceling materialization jobs. See the BatchFeatureView and StreamFeatureView interactive SDK reference for method details.
You can still use these methods for Feature Views with scheduled materialization enabled.
Triggering a new materialization job​
The trigger_materialization_job()
method allows you to initiate a job to
materialize feature values for the specified time range. This method returns a
job identifier that we’ll reference in later steps.
To backfill a newly created feature, you can use this command as a one-off to backfill data from the feature start time to current time. Note that you may want to break up particularly large backfills into multiple jobs.
During regular operations, you will likely want to set up an automated process that materializes the most recent time period once the upstream data for that period is available.
The materialization window between start_time
and end_time
must be evenly
divisible by the partition interval that's defined by the batch_schedule
or
aggregation_interval
. Also, start_time
must align with the partition
interval. That is, start_time % <partition interval>
must equal 0
.
Here's an example of using the materialization API to trigger a batch job:
from datetime import datetime
import tecton
fv = tecton.get_workspace("dev").get_feature_view("user_transaction_counts")
job_id = fv.trigger_materialization_job(
start_time=datetime(2022, 5, 1),
end_time=datetime(2022, 5, 2),
online=False,
offline=True,
)
Waiting for job completion​
After triggering a new job, you may want to monitor the job status to start a downstream process once complete.
To block your process until the job completes, use the
wait_for_materialization_job()
method. Materialization jobs can take anywhere
from minutes to hours depending on the amount of data processed.
Alternatively, you can poll for completion status using the
get_materialization_job()
method. This returns the
MaterializationJobData
class with details about the job status. The job has completed successfully if
MaterializationJobData.state=="SUCCESS".
Canceling a materialization job​
You can cancel a running materialization job using the
cancel_materialization_job(job_id)
method. Once cancelled, a job will not be
retried further. Job run state will be set to MANUAL_CANCELLATION_REQUESTED
.
Note that cancellation is asynchronous, so it may take up to several minutes for
the cancellation to complete where the job run state will be set to
MANUALLY_CANCELLED
.
Overwriting previously materialized data​
By default, the trigger_materialization_job()
method will return an error if
the specified time period specified overlaps with the time period from a
previously successful materialization job.
The overwrite=True
option will allow manual materialization to overwrite
previously materialized data.
In the offline store, the manual materialization job will delete and rewrite any previously materialized data for the specified time period.
In the online store, the manual materialization job executes an upsert. If an
entity doesn't already exist in the online store, its feature data is inserted.
If it does already exist, its data is updated if its latest timestamp is greater
than or equal to what's already in the online store. If a Feature View has a
ttl
defined, then an online store upsert will only happen for data with a
timestamp that's after the ttl
.
This operation is generally safe if:
- Your previous job completed and did not output any feature data.
- Your Feature View is only materialized offline. (The Feature View is
configured with
offline=True
andonline=False
).