Time-Windowed Aggregations with Stream Ingest API
The Stream Ingest API is specifically designed to integrate with Tecton's Aggregation Engine. See the Aggregations Reference for the full list of supported aggregation functions.
from datetime import timedelta, datetime
from tecton import StreamFeatureView, Aggregation
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source
user_click_counts_wafv = StreamFeatureView(
name="user_click_counts_wafv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2023, 1, 1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=24)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=72)),
],
description="The count of ad clicks for a user",
)
To learn more about the performance of aggregations with the Stream Ingest API, see the Continuous Windows section on aggregation performance.