Stream Data Sources with Stream Ingest API
This guide shows you how to create a StreamSource
that processes records that
processes records pushed to Tecton's Stream Ingest API endpoint. See
Stream Data Sources with Kafka or Kinesis
if you prefer Tecton to connect directly to your existing streaming
infrastructure and pull records from it.
Defining a Stream Source with a PushConfig​
A StreamSource
is used by a
StreamFeatureView
to compute
feature values from a continuous streaming data source. It supports the near
real-time calculation of features with a freshness of less than 1s.
To enable point-in-time correct training data generation, and backfills of newly
created features using historical event data, a StreamSource
needs to
configure one of two possible offline event logs:
- Self-managed offline event log: If you already maintain an offline log of historical events, you can simply point Tecton at that data source.
- Tecton-managed offline event log: Alternatively, you can instruct the
StreamSource
to log all ingested events to the offline store.
You can set the following fields on a StreamSource
:
stream_config
: This configures yourStreamSource
'sPushConfig
and allows you to specify if you want Tecton to manage your offline event log.batch_config
: If you maintain an external offline event log, you use this parameter to point Tecton at it.schema
: This is the schema of the stream source, i.e. the schema of the events that will be sent to the Stream Ingest API. Tecton will use this schema to validate all ingested JSON records. Data in an existing offline event log must match the schema.
Example of a Stream Source with a self-managed offline event log​
The following example declares a StreamSource
that can be used to ingest
transaction events. As you can see, the historical event data can be found in a
parquet file on S3.
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=False), # You don't want a Tecton-managed offline event log
batch_config=FileConfig( # This is the location of your self-managed offline event log
uri="s3://anonymous@tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)
Please see Tecton's Building Streaming Features tutorial for an end to end example.
Example of a Stream Source with a Tecton-managed offline event log​
The following example declares a StreamSource
that can be used to ingest
transaction events. As you can see, no batch_config
is specified. Tecton
manages the offline event log for you.
from tecton import (
FileConfig,
stream_feature_view,
StreamSource,
PushConfig,
)
from tecton.types import Field, String, Timestamp, Float64
transactions = StreamSource(
name="transactions",
stream_config=PushConfig(log_offline=True), # You do want a Tecton-managed offline event log
schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("amt", Float64)],
)