Create a Push Data Source
This guide shows you how to create a PushSource
that processes records sent to
the Stream Ingest API.
Defining a Push Data Source​
A PushSource
is used by a StreamFeatureView
to generate feature values using
data from both the stream and batch sources. A StreamFeatureView
applies the
same transformation to both data sources.
In addition to the metadata needed to define Tecton objects, a PushSource
needs a schema
and optionally a batch_config
:
schema
: This is the schema of the push source, i.e. the schema of the events that will be pushed to Tecton, and to the online store. We will use this schema to validate the JSON users send to Tecton. The types supported by the API are the same ones supported by the Feature Server.batch_config
: The configuration for a batch source that backs the push source. Must contain at least all the columns defined in the schema.- You can use
post_processor
to match values to the push source schema. The batch source defined by thebatch_config
contains the stream's historical data. It will be used to backfill feature data into the online store. - If the
batch_config
is not specified, then the only way to populate the data source for testing is by sending records to the Stream Ingest API. - The value of the
batch_config
can be the name of an object (such asHiveConfig
) or a Data Source Function. A Data Source Function offers more flexibility than an object.
- You can use
The following example declares a PushSource
object.
from tecton import PushSource, HiveConfig
from tecton.types import String, Int64, Timestamp, Field
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
def post_processor_batch(df):
from pyspark.sql.functions import col
df = df.select(
col("user_id").cast("string").alias("user_id"),
col("timestamp").cast("timestamp").alias("timestamp"),
col("clicked").cast("long").alias("clicked"),
)
return df
impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
batch_config=HiveConfig(
database="demo_ads",
table="impressions_batch",
post_processor=post_processor_batch,
timestamp_field="timestamp",
),
description="Sample Push Source for ad impression events",
)
You can then create a StreamFeatureView
with the PushSource
you created to
test it.
from datetime import datetime, timedelta
from tecton import StreamFeatureView, FilteredSource, BatchTriggerType
click_push_fv = StreamFeatureView(
name="click_events_fv",
source=FilteredSource(impressions_event_source),
entities=[user_id],
online=True,
offline=True,
feature_start_time=datetime(2022, 10, 10),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=7),
description="The count of ad clicks for a user",
batch_trigger=BatchTriggerType.MANUAL,
)
- When parameter
online
is set to False, events sent to the Stream Ingest API won't be written to the Online Store, but will still be validated.
Once applied successfully, this Push Source and Stream Feature View is ready for
Streaming Ingest. Please note, there may be a few seconds delay after the
tecton apply
before the API can accept records for the Push Source.