Read Multiple Feature Vectors using Wildcards
This feature is not supported in Tecton on Snowflake.
If you are interested in this functionality, please file a feature request.
Tecton allows you to fetch a set of features based on specifying a subset of entity IDs. This functionality is commonly used when multiple candidates need to be scored, such as in a recommendation system.
In this example, we'll show how to retrieve feature vectors for all ads a user has seen in the past week. We'll walk through:
- Configuring the feature view with an online serving index
- Retrieving features online
- Creating training data
Configuring your feature views​
First, when defining the feature view, you need to specify the
online_serving_index
parameter and omit the key you won't use during
retrieval. In this case, we will specify the user at feature retrieval time, and
get back a row for each ad they have feature values for.
This example does not apply to Tecton on Snowflake because it does not support
stream feature views. However, Tecton on Snowflake does support
online_serving_index
.
from tecton import stream_feature_view, Aggregation
from core.entities import user
from ads.entities import ad
from ads.data_sources.ad_impressions_stream import ad_impressions_stream
from datetime import datetime, timedelta
@stream_feature_view(
source=ad_impressions_stream,
entities=[user, ad],
online_serving_index=["user_uuid"], # Only the user_uuid will be used at retrieval time
mode="spark_sql",
aggregation_interval=timedelta(hours=1),
aggregations=[
Aggregation(column="impression", function="count", time_window=timedelta(hours=1)),
Aggregation(column="impression", function="count", time_window=timedelta(hours=12)),
Aggregation(column="impression", function="count", time_window=timedelta(hours=24)),
Aggregation(column="impression", function="count", time_window=timedelta(hours=72)),
Aggregation(column="impression", function="count", time_window=timedelta(hours=168)),
],
online=False,
offline=False,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2021, 1, 1),
description="The count of impressions between a given user and a given ad",
)
def user_ad_impression_counts(ad_impressions_stream):
return f"""
select
user_uuid as user_id,
ad_id,
1 as impression,
timestamp
from
{ad_impressions_stream}
"""
Now that we've specified our serving indices for the Feature View, let's create our Feature Service to enable online retrieval.
from tecton import FeatureService, FeaturesConfig
from feature_repo.shared.features.user_ad_impression_counts_wildcard import (
user_ad_impression_counts_wildcard,
)
ctr_prediction_service = FeatureService(
name="ctr_prediction_service",
description="A Feature Service used for supporting a CTR prediction model.",
online_serving_enabled=True,
features=[user_ad_impression_counts],
)
Fetching wildcard features online​
Once those changes have been applied, we can use the Tecton python library to
retrieve a dataframe representing all the features that match our user by
omitting the ad_id
join key.
import tecton
ws = tecton.get_workspace("prod")
my_fs = ws.get_feature_service("ctr_prediction_service")
keys = {"user_uuid": "sample-user-uuid"}
response = my_fs.query_features(keys).to_pandas()
print(response.head())
Alternatively, we can use the HTTP API. See the section above for more detail on how to configure the API key.
$ export TECTON_API_KEY='<your_tecton_key>'
$ curl -X POST https://<your_cluster>.tecton.ai/api/v1/feature-service/get-features\
-H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
"params": {
"workspace_name": "prod",
"feature_service_name": "ctr_prediction_service",
"join_key_map": {
"user_uuid": "sample-user-id",
}
}
}'
Creating training sets with wildcard features​
Similarly, we can construct our training dataset by providing a prediction context that contains the join key we specified as our serving index.
import tecton
events = spark.read.parquet("dbfs:/event_data.pq").select("user_uuid", "timestamp")
ws = tecton.get_workspace("prod")
my_fs = ws.get_feature_service("ctr_prediction_service")
training_set = fs.get_historical_features(events, timestamp_key="timestamp")
print(training_set.to_pandas().head())