Skip to main content
Version: 0.6

tecton.stream_feature_view

Summary​

Declare a Stream Feature View.

Parameters​

  • name (Optional[str]) – Unique, human friendly name that identifies the FeatureView. Defaults to the function name.

  • description (Optional[str]) – A human readable description. (Default: None)

  • owner (Optional[str]) – Owner name (typically the email of the primary maintainer). (Default: None)

  • tags (Optional[Dict[str, str]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). (Default: None)

  • prevent_destroy (bool) – If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object, prevent_destroy must be first set to False via a separate tecton apply. prevent_destroy can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs. prevent_destroy also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. if prevent_destroy is set on a Feature Service,that will also prevent deletions or re-creates of Feature Views used in that service. prevent_destroy is only enforced in live (i.e. non-dev) workspaces. (Default: False)

  • source (Union[StreamSource, PushSource, FilteredSource]) – The data source input to the feature view.

  • entities (Sequence[Entity]) – The entities this feature view is associated with.

  • mode (str) – Whether the annotated function is a pipeline function (“pipeline” mode) or a transformation function (“spark_sql” or “pyspark” mode). For the non-pipeline mode, an inferred transformation will also be registered.

  • aggregation_interval (Optional[timedelta]) – How frequently the feature value is updated (for example, “1h” or “6h”) (Default: None)

  • aggregations (Optional[Sequence[Aggregation]]) – A list of Aggregation structs. (Default: None)

  • stream_processing_mode (Optional[StreamProcessingMode]) – Whether aggregations should be “batched” in time intervals or be updated continuously. Continuously aggregated features are fresher but more expensive. One of StreamProcessingMode.TIME_INTERVAL or StreamProcessingMode.CONTINUOUS. Defaults to StreamProcessingMode.TIME_INTERVAL.

  • aggregation_mode (Optional[StreamProcessingMode]) – Deprecated. Use stream_processing_mode instead.

  • online (bool) – Whether the feature view should be materialized to the online feature store. (Default: False)

  • offline (bool) – Whether the feature view should be materialized to the offline feature store. (Default: False)

  • ttl (Optional[timedelta]) – The TTL (or “look back window”) for features defined by this feature view. This parameter determines how long features will live in the online store and how far to “look back” relative to a training example’s timestamp when generating offline training sets. Shorter TTLs improve performance and reduce costs. (Default: None)

  • feature_start_time (Optional[datetime]) – When materialization for this feature view should start from. Required if offline=true. (Default: None)

  • batch_trigger (BatchTriggerType) – Defines the mechanism for initiating batch materialization jobs. One of BatchTriggerType.SCHEDULED or BatchTriggerType.MANUAL. The default value is BatchTriggerType.SCHEDULED, where Tecton will run materialization jobs based on the schedule defined by the batch_schedule parameter. If set to BatchTriggerType.MANUAL, then batch materialization jobs must be explicitly initiated by the user through either the Tecton SDK or Airflow operator.

  • batch_schedule (Optional[timedelta]) – The interval at which batch materialization should be scheduled. The batch schedule must not include fractional seconds. (Default: None)

  • online_serving_index (Optional[Sequence[str]]) – (Advanced) Defines the set of join keys that will be indexed and queryable during online serving. (Default: None)

  • batch_compute (Union[DatabricksClusterConfig, EMRClusterConfig, DatabricksJsonClusterConfig, EMRJsonClusterConfig, None]) – Batch materialization cluster configuration. (Default: None)

  • stream_compute (Union[DatabricksClusterConfig, EMRClusterConfig, DatabricksJsonClusterConfig, EMRJsonClusterConfig, None]) – Streaming materialization cluster configuration. (Default: None)

  • offline_store (Union[ParquetConfig, DeltaConfig, None]) – Configuration for how data is written to the offline feature store. (Default: ParquetConfig(subdirectory_override=None))

  • online_store (Union[DynamoConfig, RedisConfig, None]) – Configuration for how data is written to the online feature store. (Default: None)

  • monitor_freshness (bool) – If true, enables monitoring when feature data is materialized to the online feature store. (Default: False)

  • expected_feature_freshness (Optional[timedelta]) – Threshold used to determine if recently materialized feature data is stale. Data is stale if now - most_recent_feature_value_timestamp > expected_feature_freshness. For feature views using Tecton aggregations, data is stale if now - round_up_to_aggregation_interval(most_recent_feature_value_timestamp) > expected_feature_freshness. Where round_up_to_aggregation_interval() rounds up the feature timestamp to the end of the aggregation_interval. Value must be at least 2 times aggregation_interval. If not specified, a value determined by the Tecton backend is used. (Default: None)

  • alert_email (Optional[str]) – Email that alerts for this FeatureView will be sent to. (Default: None)

  • timestamp_field (Optional[str]) – The column name that refers to the timestamp for records that are produced by the feature view. This parameter is optional if exactly one column is a Timestamp type. (Default: None)

  • max_batch_aggregation_interval (Optional[timedelta]) – (Advanced) The time interval for which each backfill job will run to materialize feature data. This affects the number of backfill jobs that will run, which is (<feature registration time> - feature_start_time) / max_batch_aggregation_interval. Configuring the max_batch_aggregation_interval parameter appropriately will help to optimize large backfill jobs. If this parameter is not specified, then 10 backfill jobs will run (the default).

  • output_stream (Optional[OutputStream]) – Configuration for a stream to write feature outputs to, specified as a tecton.framework.configs.KinesisOutputStream or tecton.framework.configs.KafkaOutputStream. (Default: None)

  • schema (Optional[List[Field]]) – Tecton schema specifying the expected output of the feature view. This should only be specified when defining stream feature views with push sources and transformations. (Default: None)

Returns​

An object of type tecton.StreamFeatureView.

Example​

from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import Aggregation, FilteredSource, stream_feature_view


@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
ttl=timedelta(days=30),
online=True,
offline=True,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2020, 10, 10),
description="Features about the users most recent transaction in the past 60 days. Updated continuously.",
)
def user_last_transaction_features(transactions_stream):
return f"""
SELECT
USER_ID,
TIMESTAMP,
AMOUNT as LAST_TRANSACTION_AMOUNT,
CATEGORY as LAST_TRANSACTION_CATEGORY
FROM
{transactions_stream}
"""

Example using aggregates​

from datetime import datetime, timedelta
from entities import user
from transactions_stream import transactions_stream
from tecton import Aggregation, FilteredSource, stream_feature_view


@stream_feature_view(
source=FilteredSource(transactions_stream),
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(minutes=10),
aggregations=[
Aggregation(column="AMOUNT", function="mean", time_window=timedelta(hours=1)),
Aggregation(column="AMOUNT", function="mean", time_window=timedelta(hours=24)),
Aggregation(column="AMOUNT", function="mean", time_window=timedelta(hours=72)),
Aggregation(column="AMOUNT", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="AMOUNT", function="sum", time_window=timedelta(hours=24)),
Aggregation(column="AMOUNT", function="sum", time_window=timedelta(hours=72)),
],
online=True,
feature_start_time=datetime(2020, 10, 10),
description="Transaction amount statistics and total over a series of time windows, updated every ten minutes.",
)
def user_recent_transaction_aggregate_features(transactions_stream):
return f"""
SELECT
USER_ID,
AMOUNT,
TIMESTAMP
FROM
{transactions_stream}
"""

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon