Publish Features to a Warehouse
This feature is currently in Public Preview.
Background​
Historical feature data can be valuable for feature selection, model tuning, and experiment analysis. It enables feature time travel, the process of reconstructing all features of a record as it existed at specific points in time. It also helps identify feature and model drift, which can be important in dynamic systems where feature relevance evolves over time.
Tecton's SDK methods can retrieve historical features for feature development and training data generation. However, these methods require an input spine of events or a set of time ranges, which can make it difficult to use them to query and explore historical feature data.
Publish Full Features​
With Tecton 0.8+, users can publish historical feature data for Batch & Stream Feature Views to Tecton's offline store for analysis, exploration, selection, and evaluation. Tecton tries to publish the feature data in blocks that match the task windows.
Users can then read features directly from the offline store or set up external tables using a data warehouse or analytics environment of choice.
Publishing full features provides more flexibility to explore feature data from any environment. In addition, when the logic or underlying source data of a Feature View changes, its feature data is also automatically republished.
Enable Publishing Features​
To publish full features for Batch & Stream Feature View:
- Ensure offline materialization is enabled (
offline=True
) - Set
publish_full_features=True
in the Feature View's OfflineStoreConfig configuration.
Please note that this will spin up new
Feature Publish jobs to pre-compute full features for
the full history of the Feature View. Optionally, specify a publish_start_time
in the Feature View's
OfflineStoreConfig
to limit the range of historical feature data to publish.
Example feature definition referenced in the doc:
@batch_feature_view(
sources=[transactions],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(function="mean", column="amt", time_window=timedelta(days=1)),
Aggregation(function="mean", column="amt", time_window=timedelta(days=3)),
Aggregation(function="mean", column="amt", time_window=timedelta(days=7)),
Aggregation(function="count", column="transaction", time_window=timedelta(days=1)),
Aggregation(function="count", column="transaction", time_window=timedelta(days=3)),
Aggregation(function="count", column="transaction", time_window=timedelta(days=7)),
],
feature_start_time=datetime(2022, 1, 1),
offline=True,
offline_store=OfflineStoreConfig(
publish_full_features=True,
publish_start_time=datetime(2023, 1, 1),
),
)
def user_transaction_metrics(transactions):
return f"""
SELECT user_id, timestamp, amt, 1 as transaction
FROM {transactions}
"""
Feature Publish Jobs​
Tecton schedules Feature Publish jobs for a time interval once the data in that interval has been successfully materialized to the offline store.
For example, consider a 7-day aggregate that is computed and updated daily. If
publish_full_features
is set to True
, then Tecton will automatically
schedule a Feature Publish job after the Feature View has successfully
materialized each day.
Tecton also intelligently republishes feature data when necessary. For example, if a user manually triggers rematerialization of a specific time interval months ago, Tecton will schedule jobs to republish all features that depend on that time interval's data once it has successfully rematerialized.
Exploring Published Features in Analytics Environments​
Storage Location​
To find the storage location of published features, use
fv.published_features_path
or fv.summary()
:
import tecton
workspace = tecton.get_workspace("prod")
fv = workspace.get_feature_view("user_transaction_metrics")
fv.published_features_path
"s3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data"
External Tables​
Once features are published, users can set up external tables in a data warehouse of choice to query them directly.
Published features are stored as a Delta table and are partitioned based on
their _valid_from
timestamp, which represents the time that the feature is
available for online serving.
Depending on the data warehouse, users may need to enumerate the schema when
creating the external table. In addition to the feature columns, the table also
contains the feature validity range defined by _valid_from
and _valid_to
columns, as well as the time_partition
column, which partitions the features
by their _valid_from
timestamp. Adding time_partition
boundaries to the
query predicate can significantly improve query performance.
Snowflake​
Snowflake needs read access to the data lake where the published features are stored. For this, create a Snowflake Storage Integration by following this doc
CREATE STORAGE INTEGRATION tecton_s3_int_read_only
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('s3://dataplane/offline-store/ws/prod/published-features/');
Then, create an external table using the storage integration and the location of the published features:
CREATE OR REPLACE STAGE user_transaction_metrics_ext_stage
URL = 's3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data'
STORAGE_INTEGRATION = tecton_s3_int_read_only
FILE_FORMAT = (TYPE = PARQUET);
Finally, create an external table to query the features:
CREATE OR REPLACE EXTERNAL TABLE user_transaction_metrics (
USER_ID VARCHAR as (value:user_id::varchar),
AMT_MEAN_1D_1D NUMBER as (value:amt_mean_1d_1d::number),
AMT_MEAN_3D_1D NUMBER as (value:amt_mean_3d_1d::number),
AMT_MEAN_7D_1D NUMBER as (value:amt_mean_7d_1d::number),
TRANSACTION_COUNT_1D_1D NUMBER as (value:transaction_count_1d_1d::number),
TRANSACTION_COUNT_3D_1D NUMBER as (value:transaction_count_3d_1d::number),
TRANSACTION_COUNT_7D_1D NUMBER as (value:transaction_count_7d_1d::number),
VALID_FROM TIMESTAMP_NTZ as (value:_valid_from::TIMESTAMP_NTZ),
VALID_TO TIMESTAMP_NTZ as (value:_valid_to::TIMESTAMP_NTZ),
TIME_PARTITION DATE as (
try_cast(
split_part(
split_part(metadata$filename, 'time_partition=', 2),
'/',
1
) as DATE
)
)
)
PARTITION BY (TIME_PARTITION)
WITH LOCATION=@user_transaction_metrics_ext_stage
REFRESH_ON_CREATE = false
AUTO_REFRESH = false
FILE_FORMAT = (type = parquet)
TABLE_FORMAT = DELTA;
Tecton is currently implementing a Snowflake integration to automatically create and refresh the external table. For now, users need to manually refresh the external table to keep Snowflake up-to-date with the Delta transaction logs:
ALTER EXTERNAL TABLE user_transaction_metrics REFRESH;
Amazon Athena​
Assuming Athena has read access to the data lake where the published features are stored, create an Athena external table to query the features:
CREATE EXTERNAL TABLE default.user_transaction_metrics
LOCATION 's3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data'
TBLPROPERTIES ('table_type' = 'DELTA')
Redshift​
Users can use Amazon Redshift Spectrum to query data directly from files on Amazon S3.
For more information, see Getting started with Amazon Redshift Spectrum and Redshift Spectrum to Delta Lake integration guides.
After creating an IAM role for Redshift Spectrum, create an external schema:
create external schema tecton_ext_features
from data catalog
database 'tecton_features_db'
iam_role 'arn:aws:iam::1234567:role/tecton_spectrum_role'
create external database if not exists;
Then, create an external table to query the features:
CREATE EXTERNAL TABLE tecton_ext_features.user_transaction_metrics (
user_id VARCHAR,
amt_mean_1d_1d DOUBLE PRECISION,
amt_mean_3d_1d DOUBLE PRECISION,
amt_mean_7d_1d DOUBLE PRECISION,
transaction_count_1d_1d BIGINT,
transaction_count_3d_1d BIGINT,
transaction_count_7d_1d BIGINT,
_valid_from TIMESTAMP,
_valid_to TIMESTAMP
)
PARTITIONED BY(time_partition DATE)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data/_symlink_format_manifest';
Add the partitions explicitly to the Redshift Spectrum table or use Glue crawler or scheduled jobs to automatically add partitions when they are available in s3:
ALTER TABLE tecton_ext_features.user_transaction_metrics
ADD IF NOT EXISTS PARTITION(time_partition='2023-01-01')
LOCATION
's3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data/_symlink_format_manifest/time_partition=2023-01-01/manifest';
Python​
Users can also use the deltalake
Python library
(docs) to read the
published features directly from the data lake without needing Spark.
from deltalake import DeltaTable
table_uri = "s3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data"
storage_options = {
"AWS_ACCESS_KEY_ID": "xxx",
"AWS_SECRET_ACCESS_KEY": "xxx",
"AWS_SESSION_TOKEN": "xxx",
}
table = DeltaTable(table_uri, storage_options)
table.to_pandas()
...
table.to_pyarrow_table()
Spark SQL​
publish_store = "s3://dataplane/offline-store/ws/prod/published-features/045asdfa34/data"
df = spark.sql(f"SELECT * FROM delta.`{publish_store}`")