Aggregation Windows
With Tecton 0.7, Tecton's Aggregation Engine supports sliding time windows, which aggregate over a fixed-length time window.
Later versions of Tecton beyond 0.7 support more aggregation windows. Please see the corresponding documentation.
Example​
from datetime import datetime, timedelta
from tecton import StreamFeatureView, Entity, Aggregation, PushSource
from tecton.types import String, Timestamp, Float64, Field
# Define the entity for this feature
user = Entity(name="User", join_keys=["user_id"])
# Define a Streaming Push Source
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="transaction_amount", dtype=Float64),
]
transactions_source = PushSource(
name="transactions_source",
schema=input_schema,
)
# Time window aggregation feature
transaction_aggregations = StreamFeatureView(
name="transaction_aggregations",
source=transactions_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2018, 1, 1),
aggregations=[
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=30)),
Aggregation(column="transaction_amount", function="sum", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="count", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="last", time_window=timedelta(days=180)),
Aggregation(column="transaction_amount", function="max", time_window=timedelta(days=365 * 4)),
Aggregation(column="transaction_amount", function="min", time_window=timedelta(days=365 * 4)),
],
)
The Stream Feature View example above aggregates over multiple different time windows. Each one of those time windows is of a fixed length.
The time windows slide forward continuously and provide you with the freshest possible streaming time window aggregation features in Tecton.
Sliding Windows that move by a fixed interval​
If you don't want your time windows to slide forward continuously, you have the option to determine a fixed interval by which time windows move forward.
You can specify the interval using the aggregation_interval
parameter
Example:
from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store
batch_schedule=timedelta(
days=1
), # Defines how frequently batch jobs are scheduled to ingest into the offline store
aggregations=[
Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 5, 1),
description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
return f"""
SELECT
user_id,
amt,
timestamp
FROM
{transactions}
"""
The time windows of this StreamFeatureView
move forward with a fixed interval
of 10 minutes, as specified by aggregation_interval
.
Time windows that move forward by a fixed interval are only supported with Spark-based Streaming FeatureViews today