Stream Feature View Examples
Row-Level SQL Transformation​
from tecton import stream_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode="spark_sql",
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=30),
    description="Last user transaction amount (stream calculated)",
)
def last_transaction_amount_sql(transactions):
    return f"""
        SELECT
            timestamp,
            user_id,
            amt
        FROM
            {transactions}
        """
Row-Level PySpark Transformation​
from tecton import stream_feature_view, FilteredSource
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode="pyspark",
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    batch_schedule=timedelta(days=1),
    ttl=timedelta(days=30),
    description="Last user transaction amount (stream calculated)",
)
def last_transaction_amount_pyspark(transactions):
    from pyspark.sql import functions as f
    return transactions.select("timestamp", "user_id", "amt")
Time-Windowed Aggregations​
from tecton import stream_feature_view, FilteredSource, Aggregation
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
    source=FilteredSource(transactions_stream),
    entities=[user],
    mode="spark_sql",
    aggregation_interval=timedelta(minutes=10),  # Defines how frequently feature values get updated in the online store
    batch_schedule=timedelta(
        days=1
    ),  # Defines how frequently batch jobs are scheduled to ingest into the offline store
    aggregations=[
        Aggregation(column="amt", function="sum", time_window=timedelta(hours=1)),
        Aggregation(column="amt", function="sum", time_window=timedelta(days=1)),
        Aggregation(column="amt", function="sum", time_window=timedelta(days=3)),
        Aggregation(column="amt", function="mean", time_window=timedelta(hours=1)),
        Aggregation(column="amt", function="mean", time_window=timedelta(days=1)),
        Aggregation(column="amt", function="mean", time_window=timedelta(days=3)),
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    description="Transaction amount statistics and total over a series of time windows, updated every 10 minutes.",
)
def user_transaction_amount_metrics(transactions):
    return f"""
        SELECT
            user_id,
            amt,
            timestamp
        FROM
            {transactions}
        """
Time-Windowed Aggregations with StreamProcessingMode.CONTINUOUS​
Please see the
Stream Processing Mode
documentation for details on how StreamProcessingMode works.
Feature Definition Example​
- Set 
stream_processing_mode=StreamProcessingMode.CONTINUOUSto enable continuous event processing. - Optionally set 
instance_availability="on_demand"within thestream_cluster_config. Spot Instances may lead to feature processing delays due to spot termination or looking for an available instance, so On-demand Instances will deliver more consistent performance. 
This example Feature View shows how to configure the decorator parameters.
from tecton import stream_feature_view, FilteredSource, Aggregation, StreamProcessingMode
from fraud.entities import user
from fraud.data_sources.transactions import transactions_stream
from datetime import datetime, timedelta
# The following defines a continuous streaming feature
# It counts the number of non-fraudulent transactions per user over a 1min, 5min and 1h time window
# The expected freshness for these features is ~5 seconds
@stream_feature_view(
    source=FilteredSource(transactions_stream),
    entities=[user],
    mode="spark_sql",
    stream_processing_mode=StreamProcessingMode.CONTINUOUS,
    aggregations=[
        Aggregation(column="transaction", function="count", time_window=timedelta(minutes=1)),
        Aggregation(column="transaction", function="count", time_window=timedelta(minutes=30)),
        Aggregation(column="transaction", function="count", time_window=timedelta(hours=1)),
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    description="Number of transactions a user has made recently",
)
def user_continuous_transaction_count(transactions):
    return f"""
        SELECT
            user_id,
            1 as transaction,
            timestamp
        FROM
            {transactions}
        """
Stream Data Source Configuration​
If your stream data source is Kinesis, we suggest lowering the default buffering to avoid delays in event processing.
Here are some suggested parameter values for a
KinesisConfig:
- 
For Databricks users:
maxFetchDuration="200ms"maxFetchRate="2"minFetchPeriod="200ms"
 - 
For EMR users:
kinesis.executor.idleTimeBetweenReadsInMs ="200"kinesis.executor.maxFetchTimeInMs = "200"
 
This example data source shows how to configure the stream options on a
KinesisConfig with Databricks.
from tecton import (
    HiveConfig,
    KinesisConfig,
    StreamSource,
    BatchSource,
    DatetimePartitionColumn,
)
from datetime import timedelta
def raw_data_deserialization(df):
    from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
    from pyspark.sql.types import (
        StructType,
        StructField,
        StringType,
        DoubleType,
        TimestampType,
        BooleanType,
        IntegerType,
    )
    payload_schema = StructType(
        [
            StructField("user_id", StringType(), False),
            StructField("transaction_id", StringType(), False),
            StructField("category", StringType(), False),
            StructField("amt", StringType(), False),
            StructField("is_fraud", StringType(), False),
            StructField("merchant", StringType(), False),
            StructField("merch_lat", StringType(), False),
            StructField("merch_long", StringType(), False),
            StructField("timestamp", StringType(), False),
        ]
    )
    return (
        df.selectExpr("cast (data as STRING) jsonData")
        .select(from_json("jsonData", payload_schema).alias("payload"))
        .select(
            col("payload.user_id").alias("user_id"),
            col("payload.transaction_id").alias("transaction_id"),
            col("payload.category").alias("category"),
            col("payload.amt").cast("double").alias("amt"),
            col("payload.is_fraud").cast("long").alias("is_fraud"),
            col("payload.merchant").alias("merchant"),
            col("payload.merch_lat").cast("double").alias("merch_lat"),
            col("payload.merch_long").cast("double").alias("merch_long"),
            from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
        )
    )
partition_columns = [
    DatetimePartitionColumn(column_name="partition_0", datepart="year", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_1", datepart="month", zero_padded=True),
    DatetimePartitionColumn(column_name="partition_2", datepart="day", zero_padded=True),
]
batch_config = HiveConfig(
    database="demo_fraud_v2",
    table="transactions",
    timestamp_field="timestamp",
    datetime_partition_columns=partition_columns,
)
transactions_stream = StreamSource(
    name="transactions_stream",
    stream_config=KinesisConfig(
        stream_name="tecton-demo-fraud-data-stream",
        region="us-west-2",
        initial_stream_position="latest",
        watermark_delay_threshold=timedelta(hours=24),
        timestamp_field="timestamp",
        post_processor=raw_data_deserialization,
        options={"roleArn": "arn:aws:iam::706752053316:role/tecton-demo-fraud-data-cross-account-kinesis-ro"},
    ),
    batch_config=batch_config,
)
transactions_batch = BatchSource(
    name="transactions_batch",
    batch_config=batch_config,
)