stream_feature_view
Summary​
Declare a Stream Feature View.
Parameters​
-
name
(Optional
[str
]) – Unique, human friendly name that identifies the FeatureView. Defaults to the function name. -
description
(Optional
[str
]) – A human readable description. (Default:None
) -
owner
(Optional
[str
]) – Owner name (typically the email of the primary maintainer). (Default:None
) -
tags
(Optional
[Dict
[str
,str
]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). (Default:None
) -
prevent_destroy
(bool
) – If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object,prevent_destroy
must be set to False via the same tecton apply or a separate tecton apply.prevent_destroy
can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs.prevent_destroy
also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. ifprevent_destroy
is set on a Feature Service,that will also prevent deletions or re-creates of Feature Views used in that service.prevent_destroy
is only enforced in live (i.e. non-dev) workspaces. (Default:False
) -
source
(Union
[StreamSource
,PushSource
,FilteredSource
]) – The data source input to the Feature View. -
entities
(Sequence
[Entity
]) – The entities this Feature View is associated with. -
mode
(str
) – Whether the annotated function is a pipeline function (“pipeline” mode) or a transformation function (“spark_sql”, “pyspark”, “snowflake_sql”, “snowpark”, or “pandas" mode). For the non-pipeline mode, an inferred transformation will also be registered. -
aggregation_interval
(Optional
[timedelta
]) – How frequently the feature value is updated (for example, “1h” or “6h”) (Default:None
) -
aggregations
(Optional
[Sequence
[Aggregation
]]) – A list ofAggregation
structs. (Default:None
) -
aggregation_secondary_key
(Optional[str]
) – the column name to use as the secondary key for aggregations. See Aggregation Secondary Key docs for more details. (Default:None
) -
stream_processing_mode
(Optional
[StreamProcessingMode
]) – Whether aggregations should be “batched” in time intervals or be updated continuously. Continuously aggregated features are fresher but more expensive. One ofStreamProcessingMode.TIME_INTERVAL
orStreamProcessingMode.CONTINUOUS
. If the data source has a PushConfig stream source then defaults to StreamProcessingMode.CONTINUOUS. Otherwise, defaults to StreamProcessingMode.TIME_INTERVAL. -
aggregation_mode
(Optional
[StreamProcessingMode
]) – Deprecated. Usestream_processing_mode
instead. -
online
(bool
) – Whether the Feature View should be materialized to the online feature store. (Default:False
) -
offline
(bool
) – Whether the Feature View should be materialized to the offline feature store. (Default:False
) -
ttl
(Optional
[timedelta
]) – The TTL (or “look back window”) for features defined by this Feature View. This parameter determines how long features will live in the online store and how far to “look back” relative to a training example’s timestamp when generating offline training sets. TTL should not be set for features with aggregations, since feature expiration is determined by the aggregation_interval. The default value is 'None' meaning no feature data will expire from the online store. When generating offline training datasets, the window to "look back" relative to the training example's timestamp will begin at the feature start time. (Default:None
) -
feature_start_time
(Optional
[datetime
]) – When materialization for this Feature View should start from. Required if eitheronline
oroffline
is true. (Default:None
) -
batch_trigger
(BatchTriggerType
) – Defines the mechanism for initiating batch materialization jobs. One ofBatchTriggerType.SCHEDULED
orBatchTriggerType.MANUAL
. The default value isBatchTriggerType.SCHEDULED
, where Tecton will run materialization jobs based on the schedule defined by thebatch_schedule
parameter. If set toBatchTriggerType.MANUAL
, then batch materialization jobs must be explicitly initiated by the user through either the Tecton SDK or Airflow operator. -
manual_trigger_backfill_end_time
(Optional
[datetime
]) – When backfill materialization for manually-triggered Stream Feature View should end. (Default:None
) -
batch_schedule
(Optional
[timedelta
]) – The interval at which batch materialization should be scheduled. The batch schedule must not include fractional seconds. (Default:None
) -
online_serving_index
(Optional
[Sequence
[str
]]) – (Advanced) Defines the set of join keys that will be indexed and queryable during online serving. (Default:None
) -
batch_compute
(Union
[DatabricksClusterConfig
,EMRClusterConfig
,DatabricksJsonClusterConfig
,EMRJsonClusterConfig
,DataprocJsonClusterConfig
,RiftBatchConfig
,None
]) – Configuration for the batch materialization cluster. (Default:None
) -
stream_compute
(Union
[DatabricksClusterConfig
,EMRClusterConfig
,DatabricksJsonClusterConfig
,EMRJsonClusterConfig
,None
]) – Streaming materialization cluster configuration. (Default:None
) -
offline_store
(Union
[OfflineStoreConfig
,DeltaConfig
,ParquetConfig
,None
]) – Configuration for Tecton's Offline Store. Default:OfflineStoreConfig(
staging_table_format=DeltaConfig(datetime.timedelta(days=1), subdirectory_override=None),
publish_full_features=False,
publish_start_time=None,
) -
online_store
(Union
[DynamoConfig
,RedisConfig
,None
]) – Configuration for how data is written to the online feature store. (Default:None
) -
monitor_freshness
(bool
) – If true, enables monitoring when feature data is materialized to the online feature store. (Default:False
) -
expected_feature_freshness
(Optional
[timedelta
]) – Threshold used to determine if recently materialized feature data is stale. Data is stale ifnow - most_recent_feature_value_timestamp > expected_feature_freshness
. For Feature Views using Tecton aggregations, data is stale ifnow - round_up_to_aggregation_interval(most_recent_feature_value_timestamp) > expected_feature_freshness
. Whereround_up_to_aggregation_interval()
rounds up the feature timestamp to the end of theaggregation_interval
. Value must be at least 2 timesaggregation_interval
. If not specified, a value determined by the Tecton backend is used. (Default:None
) -
alert_email
(Optional
[str
]) – Email that alerts for this FeatureView will be sent to. (Default:None
) -
data_quality_enabled
(bool
) – Enables Data Quality Metrics. (Default:True
) -
skip_default_expectations
(bool
) – Skips default Data Quality Validation. (Default:False
) -
timestamp_field
(Optional
[str
]) – The column name that refers to the timestamp for records that are produced by the Feature View. This parameter is optional if exactly one column is a Timestamp type. (Default:None
) -
max_backfill_interval
(Optional
[timedelta
]) – (Advanced) The time interval for which each backfill job will run to materialize feature data. This affects the number of backfill jobs that will run, which is(<feature registration time> - feature_start_time) / max_backfill_interval
. Configuring the max_backfill_interval parameter appropriately will help to optimize large backfill jobs. If this parameter is not specified, then 10 backfill jobs will run (the default). -
output_stream
(Optional
[OutputStream
]) – Configuration for a stream to write feature outputs to, specified as atecton.framework.configs.KinesisOutputStream
ortecton.framework.configs.KafkaOutputStream
. (Default:None
) -
schema
(Optional
[List
[Field
]]) – The expected output schema of the Feature View transformation. If provided andrun_transformation_validation=True
, Tecton will validate that the Feature View matches the expected schema. After setting theschema
for the first time, usetecton apply --suppress-recreates
to avoid recreating the Feature View. (Default:None
) -
run_transformation_validation
(bool
) – IfFalse
,schema
must be set and Tecton will skip validation of the transformation. Skipping validation can be useful to speed uptecton plan/apply
or for Feature Views that have issues with the validation process (e.g.pip
dependency conflicts). If a Data Source's dependent Feature Views all haverun_transformation_validation=False
, Data Source schema derivation will be skipped as well. (Default:True
for Feature Views executing on Spark or Snowflake;False
for Feature Views executing on Rift) -
tecton_materialization_runtime
(Optional
[str
]) - Version of the Tecton Materialization Runtime used for materialization jobs. Required on 0.8+ when materialization is enabled (withonline=True
oroffline=True
). Recommended to set to the SDK version the Feature View was applied with (e.g. "0.8.0"). (Default:None
) -
compaction_enabled
(bool
) – IfTrue
, Tecton will run a compaction job after each batch materialization job to materialize to the online store. This requires the use of DynamoDB and uses the ImportTable API. Because each batch job overwrites the online store, a larger compute cluster may be required. -
stream_tiling_enabled
(bool
) – IfFalse
, Tecton transforms and writes all events from the stream to the online store (same as stream_processing_mode=StreamProcessingMode.CONTINUOUS
) . IfTrue
, Tecton will store the partial aggregations of the events in the online store. Defaults toFalse
. -
options
(Optional
[Dict
[str
,str
]]) – A map of additional stream feature view options. (Default:None
) -
features
(Optional
[Union
[List
[Aggregate
]],List
[Attribute
]]) – A list of features which can either be a list ofAttribute
or a list ofAggregate
. (Default:None
) -
environment
(Optional
[str
]) – The custom environment in which materialization jobs will be run. Defaults toNone
, which means jobs will execute in the default Tecton environment.
Returns​
An object of type
StreamFeatureView
.
Example​
from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import Aggregation, FilteredSource, stream_feature_view
from tecton.types import Field, Int64, String, Timestamp
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
ttl=timedelta(days=30),
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
schema=[
Field("USER_ID", String),
Field("TIMESTAMP", Timestamp),
Field("LAST_TRANSACTION_AMOUNT", Int64),
Field("LAST_TRANSACTION_CATEGORY", String),
],
run_transformation_validation=True,
description="Features about the users most recent transaction in the past 30 days. Updated continuously.",
tecton_materialization_runtime="0.8.0",
)
def user_last_transaction_features(transactions_stream):
return f"""
SELECT
USER_ID,
TIMESTAMP,
AMOUNT as LAST_TRANSACTION_AMOUNT,
CATEGORY as LAST_TRANSACTION_CATEGORY
FROM
{transactions_stream}
"""
Example using aggregates​
from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import Aggregation, FilteredSource, stream_feature_view, TimeWindow
from tecton.types import Int64, String, Timestamp, Field
@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10),
aggregations=[
Aggregation(column="AMOUNT", function="mean", time_window=TimeWindow(window_size=timedelta(hours=1))),
Aggregation(column="AMOUNT", function="mean", time_window=TimeWindow(window_size=timedelta(hours=24))),
Aggregation(column="AMOUNT", function="mean", time_window=TimeWindow(window_size=timedelta(hours=72))),
Aggregation(column="AMOUNT", function="sum", time_window=TimeWindow(window_size=timedelta(hours=1))),
Aggregation(column="AMOUNT", function="sum", time_window=TimeWindow(window_size=timedelta(hours=24))),
Aggregation(column="AMOUNT", function="sum", time_window=TimeWindow(window_size=timedelta(hours=72))),
],
online=True,
feature_start_time=datetime(2020, 10, 10),
schema=[
Field("USER_ID", String),
Field("AMOUNT", Int64),
Field("TIMESTAMP", Timestamp),
],
description="Transaction amount statistics and total over a series of time windows, updated every ten minutes.",
tecton_materialization_runtime="0.8.0",
)
def user_recent_transaction_aggregate_features(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""