tecton.StreamSource
Summary​
A Tecton StreamSource, used to unify stream and batch data into Tecton for use in a StreamFeatureView.
Attributes​
Name | Data Type | Description |
---|---|---|
created_at | Optional[datetime.datetime] | The time that this Tecton object was created or last updated. |
data_delay | Optional[datetime.timedelta] | Returns the duration that materialization jobs wait after the batch_schedule before starting, typically to ensure that all data has landed. |
defined_in | Optional[str] | The repo filename where this object was declared. |
description | Optional[str] | Returns the description of the Tecton object. |
id | str | Returns the unique id of the Tecton object. |
info | ||
name | str | Returns the name of the Tecton object. |
owner | Optional[str] | Returns the owner of the Tecton object. |
tags | Dict[str,str] | Returns the tags of the Tecton object. |
workspace | Optional[str] | Returns the workspace that this Tecton object belongs to. |
options | Optional[Dict[str, str]] | A map of additional stream data source options. |
Methods​
Name | Description |
---|---|
__init__(...) | Creates a new StreamSource. |
get_columns() | Returns the column names of the data source’s streaming schema. |
get_dataframe(...) | Returns the data in this Data Source as a Tecton DataFrame. |
start_stream_preview(...) | Starts a streaming job to write incoming records from this DS’s stream to a temporary table with a given name. |
ingest | Ingests a single event into the Tecton Online Ingest API. |
summary() | Displays a human readable summary of this Data Source. |
validate() | Validate this Tecton object and its dependencies (if any). |
__init__(...)​
Creates a new StreamSource.
Parameters​
-
name
(str
) – A unique name of the DataSource. -
description
(Optional
[str
]) – A human-readable description. (Default:None
) -
tags
(Optional
[Dict
[str
,str
]]) – Tags associated with this Tecton Object (key-value pairs of arbitrary metadata). (Default:None
) -
owner
(Optional
[str
]) – Owner name (typically the email of the primary maintainer). (Default:None
) -
prevent_destroy
(bool
) – If True, this Tecton object will be blocked from being deleted or re-created (i.e. a destructive update) during tecton plan/apply. To remove or update this object, prevent_destroy must be first set to False via the same tecton apply or a separate tecton apply. prevent_destroy can be used to prevent accidental changes such as inadvertently deleting a Feature Service used in production or recreating a Feature View that triggers expensive rematerialization jobs. prevent_destroy also blocks changes to dependent Tecton objects that would trigger a recreate of the tagged object, e.g. if prevent_destroy is set on a Feature Service, that will also prevent deletions or re-creates of Feature Views used in that service. prevent_destroy is only enforced in live (i.e. non-dev) workspaces. (Default:False
) -
batch_config
– BatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. -
stream_config
– StreamConfig object containing the configuration of the Stream Data Source that backs this Tecton Stream Source. -
schema
- A schema for the StreamSource, must be provided for Stream Sources with a Push Config
Example​
import pyspark
from tecton import KinesisConfig, HiveConfig, StreamSource
from datetime import timedelta
# Define our deserialization raw stream translator
def raw_data_deserialization(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
from pyspark.sql.functions import col, from_json, from_utc_timestamp
from pyspark.sql.types import StructType, StringType
payload_schema = (
StructType()
.add("amount", StringType(), False)
.add("isFraud", StringType(), False)
.add("timestamp", StringType(), False)
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.amount").cast("long").alias("amount"),
col("payload.isFraud").cast("long").alias("isFraud"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
# Declare a StreamSource with both a batch_config and a stream_config as parameters
# See the API documentation for both BatchConfig and StreamConfig
transactions_stream = StreamSource(
name="transactions_stream",
stream_config=KinesisConfig(
stream_name="transaction_events",
region="us-west-2",
initial_stream_position="latest",
watermark_delay_threshold=timedelta(minutes=30),
timestamp_field="timestamp",
post_processor=raw_data_deserialization, # deserialization function defined above
options={"roleArn": "arn:aws:iam::472542229217:role/demo-cross-account-kinesis-ro"},
),
batch_config=HiveConfig(
database="demo_fraud",
table="transactions",
timestamp_field="timestamp",
),
)
-
batch_config
(Union
[FileConfig
,HiveConfig
,RedshiftConfig
,SnowflakeConfig
,SparkBatchConfig
]) – BatchConfig object containing the configuration of the Batch Data Source that backs this Tecton Stream Source. -
stream_config
(Union
[KinesisConfig
,KafkaConfig
,SparkStreamConfig
,PushConfig
]) – Stream Configuration object containing the configuration of the Stream Data Source that backs this Tecton Stream Source.
get_columns()​
Returns the column names of the data source’s streaming schema.
get_dataframe(...)​
Returns the data in this Data Source as a Tecton DataFrame.
Parameters​
-
start_time
(Optional
[datetime
]) – The interval start time from when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translator
is True. (Default:None
) -
end_time
(Optional
[datetime
]) – The interval end time until when we want to retrieve source data. If no timezone is specified, will default to using UTC. Can only be defined ifapply_translator
is True. (Default:None
) -
apply_translator
(bool
) – If True, the transformation specified bypost_processor
will be applied to the dataframe for the data source.apply_translator
is not applicable to batch sources configured withspark_batch_config
because it does not have apost_processor
. (Default:True
)
Returns​
- A Tecton DataFrame containing the data source’s raw or translated source data.
Raises​
TectonValidationError
– If apply_translator
is False, but start_time
or
end_time
filters are passed in.
start_stream_preview(...)​
Starts a streaming job to write incoming records from this DS’s stream to a temporary table with a given name.
After records have been written to the table, they can be queried using
spark.sql()
. If ran in a Databricks notebook, Databricks will also
automatically visualize the number of incoming records.
This is a testing method, most commonly used to verify a StreamDataSource is correctly receiving streaming events. Note that the table will grow infinitely large, so this is only really useful for debugging in notebooks.
Parameters​
-
table_name
(str
) – The name of the temporary table that this method will write to. -
apply_translator
(bool
) – Whether to apply this data source’sraw_stream_translator
. When True, the translated data will be written to the table. When False, the raw, untranslated data will be written.apply_translator
is not applicable to stream sources configured withspark_stream_config
because it does not have apost_processor
. (Default:True
) -
checkpoint_dir
(Optional
[str
]) – A root directory where a temporary folder will be created and used by the streaming job for checkpointing. Primarily intended for use with Databricks Unity Catalog Shared Access Mode Clusters. If specified, the environment should have write permission for the specified directory. If not provided, a temporary directory will be created using the default file system. (Default:None
) -
option_overrides
(Optional
[Dict
[str
,str
]]) – A dictionary of Spark readStream options that will override any readStream options set by the data source. Can be used to configure behavior only for the preview, e.g. settingstartingOffsets:latest
to preview only the most recent events in a Kafka stream. (Default:None
)
ingest(...)​
Ingests a single event into the Tecton Online Ingest API. This utility can only be used with Stream Sources with a Push Config.
Parameters​
event
(dict
) - A dictionary representing a single event to be ingested. The schema of the dictionary must match the schema defined in the Stream Source.dry_run
(bool
) - If True, the ingest request will be validated, but the event will not be materialized to the online store. It is set to False by default.
summary()​
Displays a human readable summary of this Data Source.
validate()​
Validate this Tecton object and its dependencies (if any).
Validation performs most of the same checks and operations as tecton plan
.
-
Check for invalid object configurations, e.g. setting conflicting fields.
-
For Data Sources and Feature Views, test query code and derive schemas. e.g. test that a Data Source’s specified s3 path exists or that a Feature View’s SQL code executes and produces supported feature data types.
Objects already applied to Tecton do not need to be re-validated on retrieval
(e.g. fv = tecton.get_workspace('prod').get_feature_view('my_fv')
) since they
have already been validated during tecton plan
. Locally defined objects (e.g.
my_ds = BatchSource(name="my_ds", ...)
) may need to be validated before some
of their methods can be called, e.g.
my_feature_view.get_historical_features()
.