tecton.StreamSource
Summary​
A Tecton StreamSource, used to unify stream and batch data into Tecton for use in a StreamFeatureView.
Attributes​
| Name | Data Type | Description |
|---|---|---|
data_delay | Optional[datetime.timedelta] | Returns the duration that materialization jobs wait after the batch_schedule before starting, typically to ensure that all data has landed. |
description | Optional[str] | Returns the description of the Tecton object. |
id | str | Returns the unique id of the Tecton object. |
info | ||
is_streaming | Deprecated. | |
name | str | Returns the name of the Tecton object. |
owner | Optional[str] | Returns the owner of the Tecton object. |
tags | Dict[str,str] | Returns the tags of the Tecton object. |
workspace | Optional[str] | Returns the workspace that this Tecton object belongs to. |
Methods​
| Name | Description |
|---|---|
__init__(...) | Creates a new StreamSource. |
get_columns() | Returns the column names of the data source’s streaming schema. |
get_dataframe(...) | Returns the data in this Data Source as a Tecton DataFrame. |
start_stream_preview(...) | Starts a streaming job to write incoming records from this DS’s stream to a temporary table with a given name. |
summary() | Displays a human readable summary of this Data Source. |
validate() | Validate this Tecton object and its dependencies (if any). |
__init__(...)​
Creates a new StreamSource.
Parameters​
-
name(str) – A unique name of the DataSource. -
description(Optional[str]) – A human-readable description. (Default:None) -
tags(Optional[Dict[str,str]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). (Default:None) -
owner(Optional[str]) – Owner name (typically the email of the primary maintainer). (Default:None) -
prevent_destroy(bool) – If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object, prevent_destroy must be first set to False via a separate tecton apply. prevent_destroy can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs. prevent_destroy also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. if prevent_destroy is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service. prevent_destroy is only enforced in live (i.e. non-dev) workspaces. (Default:False) -
batch_config– BatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. -
stream_config– StreamConfig object containing the configuration of the Stream Data Source that backs this Tecton Stream Source.
Example​
import pyspark
from tecton import KinesisConfig, HiveConfig, StreamSource
from datetime import timedelta
# Define our deserialization raw stream translator
def raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
from pyspark.sql.functions import col, from_json, from_utc_timestamp
from pyspark.sql.types import StructType, StringType
payload_schema = (
StructType()
.add("amount", StringType(), False)
.add("isFraud", StringType(), False)
.add("timestamp", StringType(), False)
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.amount").cast("long").alias("amount"),
col("payload.isFraud").cast("long").alias("isFraud"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
# Declare a StreamSource with both a batch_config and a stream_config as parameters
# See the API documentation for both BatchConfig and StreamConfig
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="transaction_events",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(minutes=30),
timestamp_field="timestamp",
post_processor=raw_data_deserialization, # deserialization function defined above
options={"roleArn": "arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro"},
),
batch_config=HiveConfig(
database="demo_fraud",
table="transactions",
timestamp_field="timestamp",
),
)
-
batch_config(Union[FileConfig,HiveConfig,RedshiftConfig,SnowflakeConfig,SparkBatchConfig]) – BatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. -
stream_config(Union[KinesisConfig,KafkaConfig,SparkStreamConfig]) – StreamConfig object containing the configuration of the Stream Data Source that backs this Tecton Stream Source.
get_columns()​
Returns the column names of the data source’s streaming schema.
get_dataframe(...)​
Returns the data in this Data Source as a Tecton DataFrame.
Parameters​
-
start_time(Optional[datetime]) – The interval start time from when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translatoris True. (Default:None) -
end_time(Optional[datetime]) – The interval end time until when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translatoris True. (Default:None) -
apply_translator(bool) – If True, the transformation specified bypost_processorwill be applied to the dataframe for the data source.apply_translatoris not applicable to batch sources configured withspark_batch_configbecause it does not have apost_processor. (Default:True)
Returns​
- A Tecton DataFrame containing the data source’s raw or translated source data.
Raises​
TectonValidationError – If apply_translator is False, but start_time or
end_time filters are passed in.
start_stream_preview(...)​
Starts a streaming job to write incoming records from this DS’s stream to a temporary table with a given name.
After records have been written to the table, they can be queried using
spark.sql(). If ran in a Databricks notebook, Databricks will also
automatically visualize the number of incoming records.
This is a testing method, most commonly used to verify a StreamDataSource is correctly receiving streaming events. Note that the table will grow infinitely large, so this is only really useful for debugging in notebooks.
Parameters​
-
table_name(str) – The name of the temporary table that this method will write to. -
apply_translator(bool) – Whether to apply this data source’sraw_stream_translator. When True, the translated data will be written to the table. When False, the raw, untranslated data will be written.apply_translatoris not applicable to stream sources configured withspark_stream_configbecause it does not have apost_processor. (Default:True) -
option_overrides(Optional[Dict[str,str]]) – A dictionary of Spark readStream options that will override any readStream options set by the data source. Can be used to configure behavior only for the preview, e.g. settingstartingOffsets:latestto preview only the most recent events in a Kafka stream. (Default:None)
summary()​
Displays a human readable summary of this Data Source.
validate()​
Validate this Tecton object and its dependencies (if any).
Validation performs most of the same checks and operations as tecton plan.
-
Check for invalid object configurations, e.g. setting conflicting fields.
-
For Data Sources and Feature Views, test query code and derive schemas. e.g. test that a Data Source’s specified s3 path exists or that a Feature View’s SQL code executes and produces supported feature data types.
Objects already applied to Tecton do not need to be re-validated on retrieval
(e.g. fv = tecton.get_workspace('prod').get_feature_view('my_fv')) since they
have already been validated during tecton plan. Locally defined objects (e.g.
my_ds = BatchSource(name="my_ds", ...)) may need to be validated before some
of their methods can be called, e.g.
my_feature_view.get_historical_features().