tecton.StreamSource
Summary​
A Tecton StreamSource, used to unify stream and batch data into Tecton for use in a StreamFeatureView.
Attributes​
Name | Data Type | Description |
---|---|---|
created_at | Optional[datetime.datetime] | The time that this Tecton object was created or last updated. |
data_delay | Optional[datetime.timedelta] | Returns the duration that materialization jobs wait after the batch_schedule before starting, typically to ensure that all data has landed. |
defined_in | Optional[str] | The repo filename where this object was declared. |
description | Optional[str] | Returns the description of the Tecton object. |
id | str | Returns the unique id of the Tecton object. |
info | ||
name | str | Returns the name of the Tecton object. |
owner | Optional[str] | Returns the owner of the Tecton object. |
tags | Dict[str,str] | Returns the tags of the Tecton object. |
workspace | Optional[str] | Returns the workspace that this Tecton object belongs to. |
options | Optional[Dict[str, str]] | A map of additional stream data source options. |
Methods​
Name | Description |
---|---|
__init__(...) | Creates a new StreamSource. |
get_columns() | Returns the column names of the data source’s streaming schema. |
get_dataframe(...) | Returns the data in this Data Source as a Tecton DataFrame. |
start_stream_preview(...) | Starts a streaming job to write incoming records from this DS’s stream to a temporary table with a given name. |
summary() | Displays a human readable summary of this Data Source. |
validate() | Validate this Tecton object and its dependencies (if any). |
__init__(...)​
Creates a new StreamSource.
Parameters​
-
name
(str
) – A unique name of the DataSource. -
description
(Optional
[str
]) – A human-readable description. (Default:None
) -
tags
(Optional
[Dict
[str
,str
]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). (Default:None
) -
owner
(Optional
[str
]) – Owner name (typically the email of the primary maintainer). (Default:None
) -
prevent_destroy
(bool
) – If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object, prevent_destroy must be first set to False via the same tecton apply or a separate tecton apply. prevent_destroy can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs. prevent_destroy also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. if prevent_destroy is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service. prevent_destroy is only enforced in live (i.e. non-dev) workspaces. (Default:False
) -
batch_config
– BatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. -
stream_config
– StreamConfig object containing the configuration of the Stream Data Source that backs this Tecton Stream Source.
Example​
import pyspark
from tecton import KinesisConfig, HiveConfig, StreamSource
from datetime import timedelta
# Define our deserialization raw stream translator
def raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
from pyspark.sql.functions import col, from_json, from_utc_timestamp
from pyspark.sql.types import StructType, StringType
payload_schema = (
StructType()
.add("amount", StringType(), False)
.add("isFraud", StringType(), False)
.add("timestamp", StringType(), False)
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.amount").cast("long").alias("amount"),
col("payload.isFraud").cast("long").alias("isFraud"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
# Declare a StreamSource with both a batch_config and a stream_config as parameters
# See the API documentation for both BatchConfig and StreamConfig
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="transaction_events",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(minutes=30),
timestamp_field="timestamp",
post_processor=raw_data_deserialization, # deserialization function defined above
options={"roleArn": "arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro"},
),
batch_config=HiveConfig(
database="demo_fraud",
table="transactions",
timestamp_field="timestamp",
),
)
-
batch_config
(Union
[FileConfig
,HiveConfig
,RedshiftConfig
,SnowflakeConfig
,SparkBatchConfig
]) – BatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. -
stream_config
(Union
[KinesisConfig
,KafkaConfig
,SparkStreamConfig
]) – StreamConfig object containing the configuration of the Stream Data Source that backs this Tecton Stream Source.
get_columns()​
Returns the column names of the data source’s streaming schema.
get_dataframe(...)​
Returns the data in this Data Source as a Tecton DataFrame.
Parameters​
-
start_time
(Optional
[datetime
]) – The interval start time from when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translator
is True. (Default:None
) -
end_time
(Optional
[datetime
]) – The interval end time until when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translator
is True. (Default:None
) -
apply_translator
(bool
) – If True, the transformation specified bypost_processor
will be applied to the dataframe for the data source.apply_translator
is not applicable to batch sources configured withspark_batch_config
because it does not have apost_processor
. (Default:True
)
Returns​
- A Tecton DataFrame containing the data source’s raw or translated source data.
Raises​
TectonValidationError
– If apply_translator
is False, but start_time
or
end_time
filters are passed in.
start_stream_preview(...)​
Starts a streaming job to write incoming records from this DS’s stream to a temporary table with a given name.
After records have been written to the table, they can be queried using
spark.sql()
. If ran in a Databricks notebook, Databricks will also
automatically visualize the number of incoming records.
This is a testing method, most commonly used to verify a StreamDataSource is correctly receiving streaming events. Note that the table will grow infinitely large, so this is only really useful for debugging in notebooks.
Parameters​
-
table_name
(str
) – The name of the temporary table that this method will write to. -
apply_translator
(bool
) – Whether to apply this data source’sraw_stream_translator
. When True, the translated data will be written to the table. When False, the raw, untranslated data will be written.apply_translator
is not applicable to stream sources configured withspark_stream_config
because it does not have apost_processor
. (Default:True
) -
option_overrides
(Optional
[Dict
[str
,str
]]) – A dictionary of Spark readStream options that will override any readStream options set by the data source. Can be used to configure behavior only for the preview, e.g. settingstartingOffsets:latest
to preview only the most recent events in a Kafka stream. (Default:None
)
summary()​
Displays a human readable summary of this Data Source.
validate()​
Validate this Tecton object and its dependencies (if any).
Validation performs most of the same checks and operations as tecton plan
.
-
Check for invalid object configurations, e.g. setting conflicting fields.
-
For Data Sources and Feature Views, test query code and derive schemas. e.g. test that a Data Source’s specified s3 path exists or that a Feature View’s SQL code executes and produces supported feature data types.
Objects already applied to Tecton do not need to be re-validated on retrieval
(e.g. fv = tecton.get_workspace('prod').get_feature_view('my_fv')
) since they
have already been validated during tecton plan
. Locally defined objects (e.g.
my_ds = BatchSource(name="my_ds", ...)
) may need to be validated before some
of their methods can be called, e.g.
my_feature_view.get_historical_features()
.