Training Data Generation with Stream Ingest API
To backfill data into the online and/or offline store with Stream Ingest API, a
batch_config
can be included in the PushSource. Its important to note that the
schema of the Batch Source must contain at least all the columns defined in the
schema of the Push Source. Alternatively, a post_processor
can be used in the
batch_config
to align column names and data types between the Batch Source and
the Push Source schema.
Below is an example PushSource impressions_event_source
with a batch_config
for backfilling data to the online and offline store. The post_processor
ensures that the schema of the batch source matches the input_schema
for the
Push Source.
from tecton import PushSource, HiveConfig
from tecton.types import String, Int64, Timestamp, Field
input_schema = [
Field(name="user_id", dtype=String),
Field(name="timestamp", dtype=Timestamp),
Field(name="clicked", dtype=Int64),
]
def post_processor_batch(df):
from pyspark.sql.functions import col
df = df.select(
col("user_id").cast("string").alias("user_id"),
col("timestamp").cast("timestamp").alias("timestamp"),
col("clicked").cast("long").alias("clicked"),
)
return df
impressions_event_source = PushSource(
name="impressions_event_source",
schema=input_schema,
batch_config=HiveConfig(
database="demo_ads",
table="impressions_batch",
post_processor=post_processor_batch,
timestamp_field="timestamp",
),
)
Below is a Stream Feature View using the above Push Source, with additional
configurations such as batch_schedule
and an optional
manual_trigger_backfill_end_time
for efficient backfills.
from datetime import datetime, timedelta
from tecton import StreamFeatureView
from ads.entities import user
from ads.data_sources.ad_impressions import impressions_event_source
click_events_fv = StreamFeatureView(
name="click_events_fv",
source=impressions_event_source,
entities=[user],
online=True,
offline=True,
feature_start_time=datetime(2022, 1, 1),
batch_schedule=timedelta(days=1),
manual_trigger_backfill_end_time=datetime(2023, 7, 1),
ttl=timedelta(days=7),
description="The count of ad clicks for a user",
)
- When a
batch_config
is defined in the Push Source, any records in the associated Batch Source will be backfilled to the online and/or offline store from thefeature_start_time
to themanual_trigger_backfill_end_time
. - If changes are made to
feature_start_time
ormanual_trigger_backfill_end_time
, Tecton will intelligently schedule jobs to backfill only unmaterialized data (instead of fully rematerializing the entire Feature View). - If
manual_trigger_backfill_end_time
is absent in the Feature View definition, Tecton will not automatically schedule the backfill materialization jobs and instead, you will need to trigger the jobs manually. Please refer here on more information on manually triggering materialization jobs.