Python Transformations with the Stream Ingest API
The Stream Ingest API can be used with Tecton's Serverless Python Engine to run transformations at request time, and the Aggregation Engine for defining efficient time-windowed aggregations. The same transformations will be run during offline during any backfills.
The following example shows creating a Stream Feature View that processes events from the Stream Ingest API.
from datetime import datetime, timedelta
from tecton import stream_feature_view
from tecton.types import Field, String, Timestamp, Int64
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source
output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]
@stream_feature_view(
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
mode="python",
schema=output_schema,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
- The
output_schema
refers to the schema of the records after transformation. - Stream Ingest Feature Views support
python
andpandas
mode transformations. When usingmode=Python
, the transformation will take dictionaries as inputs and is expected to output a dictionary. When usingmode=pandas
, the transformation will take Pandas DataFrames as inputs and is expected to output a Pandas DataFrame.
Transformations with Time-Windowed Aggregations​
Below is an example Stream Feature View with a Push Source with an Aggregation as well as transformations.
from datetime import datetime, timedelta
from tecton import stream_feature_view, FilteredSource, Aggregation, BatchTriggerType
from tecton.types import Field, String, Timestamp, Int64
output_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
Field(name="clicked_squared", dtype=Int64),
]
@stream_feature_view(
source=impressions_event_source,
entities=[user_id],
online=True, # When online=False, then ingest operation will perform validation but won’t write to the online store.
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
aggregations=[
Aggregation(column="clicked", function="count", time_window=timedelta(hours=1)),
Aggregation(column="clicked", function="count", time_window=timedelta(hours=12)),
Aggregation(column="clicked_squared", function="sum", time_window=timedelta(hours=24)),
],
description="The aggregated count of ad clicks for a user",
mode="python",
schema=output_schema,
)
def content_keyword_click_counts_python(user_click_push_source):
user_click_push_source["clicked_squared"] = user_click_push_source["clicked"] ** 2
return user_click_push_source
When using both transformations and Time-Windowed Aggregations, the
output_schema
parameter refers to the output of the records after
transformation. Tecton computes the schema of the post-aggregation records
automatically.