Debugging Example: A Feature Start Time Bug
This page applies to Tecton on Spark, only.
This page shows how to debug a query that is generated by
get_historical_features()
. The query is not working as expected.
The setup code below is needed to run the code that follows the setup code.
from tecton import FileConfig, BatchSource, Entity, batch_feature_view, FilteredSource, Aggregation
from datetime import datetime, timedelta
batch_config = FileConfig(
uri="s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
)
transactions_batch = BatchSource(name="transactions_batch", batch_config=batch_config)
user = Entity(name="fraud_user", join_keys=["user_id"])
@batch_feature_view(
sources=[FilteredSource(transactions_batch)],
entities=[user],
mode="spark_sql",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="transaction", function="count", time_window=timedelta(days=1)),
Aggregation(column="transaction", function="count", time_window=timedelta(days=30)),
Aggregation(column="transaction", function="count", time_window=timedelta(days=90)),
],
online=True,
offline=True,
feature_start_time=datetime(2022, 4, 30),
)
def user_transaction_counts(transactions):
return f"""
SELECT
user_id,
1 as transaction,
timestamp
FROM
{transactions}
"""
The call to get_historical_features()
that is not working as expected​
Build a spine and then call get_historical_features()
with the spine:
import tecton
import pandas as pd
from datetime import datetime
tecton.set_validation_mode("auto")
spine = pd.DataFrame.from_dict(
{
"timestamp": [datetime(2020, 1, 1, 0, 0, 0)],
"user_id": ["user_268308151877"],
}
)
df = user_transaction_counts.get_historical_features(spine=spine)
The debugging process​
df.to_spark().show()
user_id | timestamp | user_transaction_counts__transaction_count_1d_1d | user_transaction_counts__transaction_count_30d_1d | user_transaction_counts__transaction_count_90d_1d |
---|---|---|---|---|
user_268308151877 | 2020-01-01 00:00:00 | null | null | null |
Unexpectedly, null
values are returned for the features. To debug this result,
show the query plan:
df.explain()
<1> RenameColsNode: Rename columns with map {'transaction_count_1d_1d': 'user_transaction_counts__transaction_count_1d_1d', 'transaction_count_30d_1d': 'user_transaction_counts__transaction_count_30d_1d', 'transaction_count_90d_1d': 'user_transaction_counts__transaction_count_90d_1d'}. Drop columns ['_anchor_time'].
└── <2> RespectFeatureStartTimeNode: Respect the feature start time for all rows where '_anchor_time' < 2022-04-29T00:00:00+00:00 by setting all feature columns for those rows to NULL
└── <3> AsofJoinFullAggNode(spine, partial_aggregates): Spine asof join partial aggregates, where the join condition is partial_aggregates._anchor_time <= spine._anchor_time and partial aggregates are rolled up to compute full aggregates
├── <4> [spine] AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
│ └── <5> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
└── <6> [partial_aggregates] PartialAggNode: Perform partial aggregations with column '_anchor_time' as the start time of tiles.
└── <7> EntityFilterNode(feature_data, entities): Filter feature data by entities with respect to ['user_id']:
├── <8> [feature_data] FeatureViewPipelineNode(transactions): Evaluate feature view pipeline 'user_transaction_counts'
│ └── <9> [transactions] DataSourceScanNode: Scan data source 'transactions_batch' and apply time range filter [2020-01-02T00:00:00+00:00, 2020-01-02T00:00:00+00:00). WARNING: since start time >= end time, no rows will be returned.
└── <10> [entities] SelectDistinctNode: Select distinct with columns ['user_id'].
└── <11> AddRetrievalAnchorTimeNode: Add anchor time column '_anchor_time' to represent the most recent feature data available for retrieval. The time at which feature data becomes available for retrieval depends on two factors: the frequency at which the feature view is materialized, and the data delay. Since 'user_transaction_counts' is a batch feature view with aggregations, feature data is stored in tiles. Each tile has size equal to the tile interval, which is 86400 seconds. The anchor time column contains the start time of the most recent tile available for retrieval. Let T be the timestamp column 'timestamp'. The anchor time column is calculated as T - (T % batch_schedule) - tile_interval where batch_schedule = 86400 seconds.
└── <12> UserSpecifiedDataNode: User provided data with columns timestamp|user_id
Node 9 reveals a potential issue with the DataSourceScanNode
. It has a time
range filter set to [2020-01-02T00:00:00+00:00, 2020-01-02T00:00:00+00:00)
and
as a result it includes the following warning:
since start time >= end time, no rows will be returned
.
Since node 9 is empty, nodes 8, 7, and 6 will all be empty, leading the right
side of the join to empty, which is why the spine contains null
s.
Why might the time range filter have a start time of
2020-01-02T00:00:00+00:00
? Often when there is an issue with the start time of
a time range, the feature start time might be the root cause. In this situation,
the feature start time is 2022-04-30T00:00:00+00:00
, whereas the spine has
timestamp 2020-01-01T00:00:00+00:00
. This means the feature start time is
after the spine timestamp, which is suspicious.
Hypothesis: The issue appears to be that the spine has a timestamp that is before the feature start time.
Inspecting the query plan to confirm the hypothesis​
You can confirm that the time filter is the issue by inspecting the contents of the node:
feature_data = df.subtree(9)
feature_data.to_spark().show()
+-------+--------------+--------+---+--------+--------+---------+----------+---------+
|user_id|transaction_id|category|amt|is_fraud|merchant|merch_lat|merch_long|timestamp|
+-------+--------------+--------+---+--------+--------+---------+----------+---------+
+-------+--------------+--------+---+--------+--------+---------+----------+---------+
This indicates that no data is being returned from the data source, which is not what is expected.
Correcting the original query to fix the issue​
You can correct the original query by specifying a time that is after the feature start time:
import tecton
import pandas as pd
from datetime import datetime
tecton.set_validation_mode("auto")
spine = pd.DataFrame.from_dict(
{
"timestamp": [datetime(2022, 10, 1, 0, 0, 0)],
"user_id": ["user_268308151877"],
}
)
df = user_transaction_counts.get_historical_features(spine=spine)
df.to_spark().show()
user_id | timestamp | user_transaction_counts__transaction_count_1d_1d | user_transaction_counts__transaction_count_30d_1d | user_transaction_counts__transaction_count_90d_1d |
---|---|---|---|---|
user_268308151877 | 2022-10-01 00:00:00 | 0 | 13 | 36 |