Testing Stream Features
Import libraries and select your workspace​
import tecton
import pandas
from datetime import datetime
ws = tecton.get_workspace("prod")
Load a Stream Feature View​
fv = ws.get_feature_view("last_transaction_amount_sql")
fv.summary()
Start a Streaming Job to view real-time streaming features​
This section only applies to Spark streaming features. These methods must be run on a Spark cluster.
The run_stream
method will start a
Spark Structured Streaming
job and write the results to the specified temporary table.
fv.run_stream(output_temp_table="output_temp_table")
The temporary table can then be queried to view real-time results. Run this code in a separate notebook cell.
# Query the result from the streaming output table.
display(spark.sql("SELECT * FROM output_temp_table ORDER BY timestamp DESC LIMIT 5"))
user_id | timestamp | amt | |
---|---|---|---|
0 | user_469998441571 | 2022-06-07 18:31:24 | 54.46 |
1 | user_460877961787 | 2022-06-07 18:31:21 | 73.02 |
2 | user_650387977076 | 2022-06-07 18:31:20 | 46.05 |
3 | user_699668125818 | 2022-06-07 18:31:17 | 59.24 |
4 | user_394495759023 | 2022-06-07 18:31:15 | 11.38 |
Get a Range of Feature Values from Offline Feature Store​
from_source=True
can be passed in in order to bypass the offline store and
compute features on-the-fly against the raw data source. This is useful for
testing the expected output of feature values.
Use from_source=False
(default) to see what data is materialized in the
offline store.
result_dataframe = fv.get_historical_features(
start_time=datetime(2022, 5, 1), end_time=datetime(2022, 5, 2), from_source=True
).to_pandas()
display(result_dataframe)
timestamp | user_id | amt | _effective_timestamp | |
---|---|---|---|---|
0 | 2022-05-01 01:50:51 | user_337750317412 | 76.45 | 2022-05-01 01:50:51 |
1 | 2022-05-01 02:05:39 | user_884240387242 | 45.8 | 2022-05-01 02:05:39 |
2 | 2022-05-01 02:41:42 | user_950482239421 | 52.31 | 2022-05-01 02:41:42 |
3 | 2022-05-01 03:51:28 | user_884240387242 | 1.43 | 2022-05-01 03:51:28 |
4 | 2022-05-01 04:48:27 | user_469998441571 | 64.15 | 2022-05-01 04:48:27 |
Read the Latest Features from Online Feature Store​
fv.get_online_features({"user_id": "user_930691958107"}).to_dict()
Out: {"amt": 180.6}
Read Historical Features from Offline Feature Store with Time-Travel​
Create a spine
DataFrame with events to look up. For more information on
spines, check out
Selecting Sample Keys and Timestamps.
spine_df = pandas.DataFrame(
{
"user_id": ["user_930691958107", "user_131340471060"],
"timestamp": [datetime(2022, 5, 1, 19), datetime(2022, 5, 6, 10)],
}
)
display(spine_df)
user_id | timestamp | |
---|---|---|
0 | user_930691958107 | 2022-05-01 19:00:00 |
1 | user_131340471060 | 2022-05-06 10:00:00 |
from_source=True
can be passed in in order to bypass the offline store and
compute features on-the-fly against the raw data source. However, this will be
slower than reading feature data that has been materialized to the offline
store.
features_df = fv.get_historical_features(spine_df, from_source=True).to_pandas()
display(features_df)
user_id | timestamp | last_transaction_amount_sql__amt | |
---|---|---|---|
0 | user_131340471060 | 2022-05-06 10:00:00 | 31.67 |
1 | user_930691958107 | 2022-05-01 19:00:00 | 58.68 |