Creating and Testing a Streaming Data Source
This guide shows you how to create a Tecton StreamSource
that reads data from
Kafka or Kinesis. The StreamSource
can later be used by a StreamFeatureView
,
which generates feature values from the raw data that is retrieved by the
StreamSource
.
Using a notebook, you will build the StreamSource
incrementally. When the
StreamSource
is complete, you will apply it in a Tecton repo.
Before proceeding, get a foundational understanding of stream sources and what they contain.
The following applies to this guide:
-
The steps apply to Databricks and EMR, unless noted otherwise.
-
The term "batch source" refers to the batch source which backs a stream source; the batch source contains the stream's historical data.
If you are using Kinesis as your stream source, you can use Kinesis Data Firehose to send the streaming data to the batch source.
-
The
transactions
table in thedemo_fraud_v2
database is used as an example batch source.transactions_stream
is used as an example stream source.
Create and set up new notebook​
On your data platform (Databricks or EMR), create a new notebook.
Databricks
Follow these instructions to connect a notebook to your Databricks cluster.
EMR
Follow these instructions. Note that specific JAR files need to be installed to use a notebook with Kinesis and Kafka.
Import modules needed to run the notebook:
import tecton
import pandas
from datetime import datetime, timedelta
from pyspark.sql.functions import col, from_json, from_utc_timestamp, when
from pyspark.sql.types import (
StructType,
StructField,
StringType,
DoubleType,
TimestampType,
BooleanType,
IntegerType,
LongType,
)
import dateutil.parser
import tempfile
Verify you can read data directly from the stream source and its corresponding batch source​
Read data directly from the batch source​
batch_data_source = "demo_fraud_v2.transactions"
batch_data = spark.sql(f"SELECT * FROM {batch_data_source} LIMIT 10")
batch_data.show()
Read data directly from the stream source​
The following helper functions are used when reading data directly from the stream source:
write_streaming_data_to_table
writes data to a table as it is read from the streamquery_streaming_table
queries the data in the table and displays the output
def write_streaming_data_to_table(stream, stream_output_table):
with tempfile.TemporaryDirectory() as d:
(
stream.writeStream.format("memory")
.queryName(stream_output_table)
.option("checkpointLocation", d)
.outputMode("append")
.start()
)
def query_streaming_table(stream_output_table):
stream_data = spark.sql("SELECT * FROM " + stream_output_table + " LIMIT 10")
stream_data.show()
- Kinesis (Databricks)
- Kinesis (EMR)
- Kafka
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("is_fraud", LongType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_lat", DoubleType(), False),
StructField("merchant_long", DoubleType(), False),
StructField("timestamp", StringType(), False),
]
)
binary_stream = (
spark.readStream.format("kinesis")
.option("streamName", "<stream name>")
.option("region", "<region>")
.option("roleArn", "<role ARN>")
.option("initialPosition", "earliest")
.load()
)
json_stream = (
binary_stream.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("s"))
.select("s.*")
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("is_fraud", LongType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_lat", DoubleType(), False),
StructField("merchant_long", DoubleType(), False),
StructField("timestamp", StringType(), False),
]
)
binary_stream = (
spark.readStream.format("kinesis")
.option("streamName", "<stream name>")
.option("awsSTSRoleARN", "<role ARN>")
.option("awsSTSSessionName", "tecton-materialization")
.option("startingPosition", "earliest")
.option("kinesis.client.describeShardInterval", "30s")
.option("endpointUrl", "https://<region>.amazonaws.com")
.load()
)
json_stream = (
binary_stream.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("s"))
.select("s.*")
)
stream_schema = StructType(
[
StructField("user_id", StringType(), False),
StructField("transaction_id", StringType(), False),
StructField("category", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("is_fraud", LongType(), False),
StructField("merchant", StringType(), False),
StructField("merchant_lat", DoubleType(), False),
StructField("merchant_long", DoubleType(), False),
StructField("timestamp", StringType(), False),
]
)
binary_stream = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "<URL(s) of Kafka broker(s)>")
.option("subscribe", "<Kafka topic name>")
.option("startingOffsets", "earliest")
.load()
)
# Additional options needed depending on your Kafka connectivity method,
# such as TLS or SASL. For example, if connecting using TLS, specify
# kafka.security.protocol (with the value "SSL"), kafka.ssl.keystore.password,
# kafka.ssl.truststore.location, and kafka.ssl.keystore.location.
json_stream = (
binary_stream.selectExpr("cast (value as STRING) jsonData")
.select(from_json("jsonData", stream_schema).alias("s"))
.select("s.*")
)
Write data to a table as it is read from the stream:
The following command should only be run for a short period of time. The command will continuously read data directly from the stream.
write_streaming_data_to_table(json_stream, "stream_output_table_json")
Query the data in the table and display the output:
query_streaming_table("stream_output_table_json")
If no data is returned after running the previous command, run the command again after a short period of time.