tecton.spark_stream_config
Summary​
Declare a
tecton.SparkStreamConfig
for configuring a Stream Source with a Data Source Function. The function takes
in a SparkSession
returns a streaming DataFrame
.
Example​
Example defining a Data Source Function using spark_stream_config
:
from tecton import spark_stream_config
def raw_data_deserialization(df):
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
)
payload_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amt", StringType(), False),
StructField("timestamp", StringType(), False),
]
)
return (
df.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", payload_schema).alias("payload"))
.select(
col("payload.user_id").alias("user_id"),
col("payload.transaction_id").alias("transaction_id"),
col("payload.category").alias("category"),
col("payload.amt").cast("double").alias("amt"),
from_utc_timestamp("payload.timestamp", "UTC").alias("timestamp"),
)
)
@spark_stream_config()
def kinesis_data_source_function(spark):
options = {
"streamName": "<stream name>",
"roleArn": "<role ARN>",
"region": "<region>",
"shardFetchInterval": "30s",
"initialPosition": "latest",
}
reader = spark.readStream.format("kinesis").options(**options)
df = reader.load()
df = raw_data_deserialization(df)
watermark = "{} seconds".format(timedelta(hours=24).seconds)
df = df.withWatermark("timestamp", watermark)
return df