Transformations
Overviewβ
A transformation is a function that specifies logic to run against data retrieved from external data sources.
By default, transformations are inlined into Feature Views.
The following example shows a Feature View that implements a transformation in
the body of the Feature View function my_feature_view
. The transformation runs
in spark_sql
mode and renames columns from the data source to feature_one
and feature_two
.
@batch_feature_view(
mode="spark_sql",
# ...
)
def my_feature_view(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""
Alternatively, transformations can be
defined outside of Feature Views
in Tecton objects identified with the @transformation
decorator. This allows
transformations to be modular, discoverable in Tecton's Web UI, and reusable
across multiple Feature Views.
Transformation input and outputβ
Inputβ
The input to a transformation contains the columns in the data source.
Outputβ
When a transformation is defined inside of a Feature View, the output of the
transformation is a DataFrame
that must include:
- The join keys of all entities included in the
entities
list - A timestamp column. If there is more than one timestamp column, a
timestamp_key
parameter must be set to specify which column is the correct timestamp of the feature values. - Feature value columns. All columns other than the join keys and timestamp will be considered features in a Feature View.
Modesβ
A transformation mode specifies the format in which a transformation needs to be
written. For example, in spark_sql
mode, a transformation needs to be written
in SQL, while in pyspark
mode, a transformation needs to be written using the
PySpark DataFrame
API.
This page describes the transformation modes that are supported by transformations defined inside and outside of Feature Views.
The examples show transformations defined inside of Feature Views.
Modes for Batch Feature Views and Stream Feature Viewsβ
mode="spark_sql"
and mode="snowflake_sql"
β
Characteristic | Description |
---|---|
Summary | Contains a SQL query |
Supported Feature View types | Batch Feature View, Stream Feature View. mode="snowflake_sql" is not supported in Stream Feature Views. |
Supported data platforms | Databricks, EMR, Snowflake |
Input type | A string (the name of a view generated by Tecton) |
Output type | A string |
Exampleβ
- Spark
- Snowflake
@batch_feature_view(
mode="spark_sql",
# ...
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
IF (credit_score > 670, 1, 0) as user_has_good_credit,
date as timestamp
FROM
{credit_scores}
"""
@batch_feature_view(
mode="snowflake_sql",
# ...
)
def user_has_good_credit(credit_scores):
return f"""
SELECT
user_id,
IFF (credit_score > 670, 1, 0) as user_has_good_credit,
date as timestamp
FROM
{credit_scores}
"""
mode="pyspark"
β
Characteristic | Description |
---|---|
Summary | Contains Python code that is executed within a Spark context. |
Supported Feature View types | Batch Feature View, Stream Feature View |
Supported data platforms | Databricks, EMR |
Input type | A Spark DataFrame or a Tecton constant |
Output type | A Spark DataFrame |
Notes | Third party libraries can be included in user-defined PySpark functions if your cluster allows third party libraries. |
Exampleβ
@batch_feature_view(
mode="pyspark",
# ...
)
def user_has_good_credit(credit_scores):
from pyspark.sql import functions as F
df = credit_scores.withColumn(
"user_has_good_credit",
F.when(credit_scores["credit_score"] > 670, 1).otherwise(0),
)
return df.select("user_id", df["date"].alias("timestamp"), "user_has_good_credit")
mode="snowpark"
β
Characteristic | Description |
---|---|
Summary | Contains Python code that is executed in Snowpark, using the Snowpark API for Python. |
Supported Feature View Types | Batch Feature View |
Supported data platforms | Snowflake |
Input type | a snowflake.snowpark.DataFrame or a Tecton constant |
Output type | A snowflake.snowpark.DataFrame |
Notes | The transformation function can call functions that are defined in Snowflake. |
Exampleβ
@batch_feature_view(
mode="snowpark",
# ...
)
def user_has_good_credit(credit_scores):
from snowflake.snowpark.functions import when, col
df = credit_scores.withColumn("user_has_good_credit", when(col("credit_score") > 670, 1).otherwise(0))
return df.select("user_id", "user_has_good_credit", "timestamp")
Modes for On Demand Feature Viewsβ
mode="pandas"
β
Characteristic | Description |
---|---|
Summary | Contains Python code that operates on a Pandas DataFrame |
Supported Feature View Types | On Demand Feature View |
Supported data platforms | Databricks, EMR, Snowflake |
Input type | A Pandas DataFrame or a Tecton constant |
Output type | A Pandas DataFrame |
Exampleβ
@on_demand_feature_view(
mode="pandas",
# ...
)
def transaction_amount_is_high(transaction_request):
import pandas as pd
df = pd.DataFrame()
df["transaction_amount_is_high"] = (transaction_request["amount"] >= 10000).astype("int64")
return df
mode="python"
β
Characteristic | Description |
---|---|
Summary | Contains Python code that operates on a dictionary |
Supported Feature View Types | On Demand Feature View |
Supported data platforms | Databricks, EMR, Snowflake |
Input type | A dictionary |
Output type | A dictionary |
Exampleβ
@on_demand_feature_view(
mode="python",
# ...
)
def user_age(request, user_date_of_birth):
from datetime import datetime, date
request_datetime = datetime.fromisoformat(request["timestamp"]).replace(tzinfo=None)
dob_datetime = datetime.fromisoformat(user_date_of_birth["USER_DATE_OF_BIRTH"])
td = request_datetime - dob_datetime
return {"user_age": td.days}
Defining Transformations Outside of Feature Viewsβ
Compared to defining a transformation inside of a Feature View, the main advantages of defining a transformation outside of a Feature View are:
- Reusability
- Transformations can be reused by multiple Feature Views.
- A Feature View can call multiple transformations.
- Discoverability: Transformations can be searched in the Web UI.
To define a transformation outside of a Feature View, define a function using
the @transformation
decorator, with mode
set to a supported
transformation mode. For example:
@transformation(mode="spark_sql")
def my_transformation(input_data):
return f"""
SELECT
entity_id,
timestamp,
column_a AS feature_one,
column_b AS feature_two
FROM {input_data}
"""
In the definition of the Feature View that will use the transformation, specify
mode="pipeline"
, and call the transformation function from the Feature View
function.
@batch_feature_view(mode="pipeline", ...)
def my_feature_view(input_data):
return my_transformation(input_data)
Guidelines for using @transformation
functionsβ
@transformation
function outputs can only be referenced insidemode=βpipelineβ
Feature View definitions.- A
@transformation
function cannot reference another@transformation
function. Instead, chained invocations should be entirely declared inside amode=βpipelineβ
Feature View definition. You can also choose to convert upstream@transformation
s to a native Python function to reference them inside a@transformation
. mode="pipeline"
Feature View definitions cannot invoke any PySpark operations. They can only pass DataFrame objects and Tectonconst
types in and out of upstream@transformations
.
An alternative to using a @transformation
functionβ
As an alternative to implementing a transformation function using the
@transformation
decorator, a native Python function can be used. Native Python
functions:
- Can be called from inside Feature View definitions (that use any
mode
) - Can be called from
@transformation
functions. - Are not discoverable in the Web GUI.
Imports and Dependenciesβ
Importing Python modules into transformationsβ
Transformations support the pandas
and numpy
modules, only. These modules
can only be used in Pandas transformations.
Python modules must be imported inside the transformation function.
Avoid using aliases for imports (e.g. use import pandas
instead of
import pandas as pd
).
Any modules used for type annotations in function signatures must be imported outside the function.
In the following example, the pandas
module is imported in two places:
- Inside of the transformation function, because the function uses the
pandas
module - Outside of the transformation function, because
pandas
type annotations are used in the function signature (my_transformation(request: pandas.DataFrame) -> pandas.DataFrame:
)
from tecton import transformation
import pandas # required for type hints on my_transformation.
@transformation(mode="pandas")
def my_transformation(request: pandas.DataFrame) -> pandas.DataFrame:
import pandas # required for pandas.DataFrame() below.
df = pandas.DataFrame()
df["amount_is_high"] = (request["amount"] >= 10000).astype("int64")
return df
Importing Python objects into transformation functionsβ
Object imports must be done outside of the transformation definition.
The following imports of objects into transformation functions are allowed:
- Functions
- Constants
The following imports of objects into transformation functions are not allowed:
- Classes
- Class instances
- Enums
In the following example,
my_func, my_int_const, my_string_const, my_dict_const
are imported from
my_local_module
. The import takes place outside of the transformation
function.
from tecton import transformation
import pandas # required for type hints on my_transformation.
from my_local_module import my_func, my_int_const, my_string_const, my_dict_const
@transformation(mode="pandas")
def my_transformation(request: pandas.DataFrame) -> pandas.DataFrame:
import pandas # required for pandas.DataFrame() below.
df = pandas.DataFrame()
df[my_dict_const["resultval"]] = my_func(request[my_string_const] >= my_int_const)
return df
Using Third Party Dependencies in PySpark Transformationsβ
Overviewβ
You can use third party Python packages in UDFs used by PySpark transformations
(mode="pyspark"
) by declaring them in extra_pip_dependencies
within the
cluster config object, that is either tecton.EMRClusterConfig
or
tecton.DatabricksClusterConfig
. An example follows:
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
In extra_pip_dependencies
, you can also specify Amazon S3 URIs, to import your
own modules. Ensure that the Tecton Spark IAM role or your Tecton Databricks
instance profile has access to read from this S3 URI.
To use your own module, package it as a Python wheel or egg file in S3.
Using a third party dependency inside of a Feature View transformationβ
In this example we're using an imported function log1p
from Tensorflow in a
UDF that is defined inside of a PySpark Feature View transformation.
Note that the import statement has to be within the UDF body; placing it at the top of the file or inside transformation function will not work.
from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, FilteredSource
from shared import entities, data_sources
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
@batch_feature_view(
mode="pyspark",
sources=[FilteredSource(source=user_clicks)],
entities=[entities.product],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
batch_compute=new_emr,
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
def my_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
extra_udf = F.udf(my_udf, LongType())
return user_clicks.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)
Examples of @transformations functionsβ
The following code is referenced by the examples that follow it.
ds_func
is a
data source function.
The function generates a DataFrame
, which is used by the transformations in
the examples.
@spark_batch_config()
def ds_func(spark):
import pandas as pd
content_ids = ["c1", "c2"]
other_vals = [1, 2]
ts = [pd.Timestamp("2022-01-01T12"), pd.Timestamp("2022-01-02T12")]
return spark.createDataFrame(
pd.DataFrame({"content_id": content_ids, "other_vals": other_vals, "event_timestamp": ts})
)
test_source = BatchSource(name="test_source", batch_config=ds_func)
from tecton import Entity
content = Entity(name="Content", join_keys=["content_id"])
A Feature View that calls a pyspark
@transformation
β
@transformation(mode="pyspark")
def transform1(input_df):
return df2.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return transform1(input_df)
A Feature View that calls two pyspark
@transformation
s with chainingβ
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return transform1(add_impression(input_df))
A Feature View that calls a pyspark
@transformation
, which calls a native Python functionβ
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return transform1(input_df)
A pyspark
Feature View that calls two native Python functionsβ
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
intermediate_df = add_impression(input_df)
return transform1(intermediate_df)
A pyspark
Feature View that calls a native Python function and invokes the pyspark method withColumn
β
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
def transform1(input_df):
input_df = add_impression(input_df)
return input_df.select("content_id", "event_timestamp", "other_vals")
@batch_feature_view(
mode="pyspark",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
from pyspark.sql import functions as F
intermediate_df = input_df.withColumn("impression", F.lit(1))
return transform1(intermediate_df)
A Feature View that calls a pyspark
@transformation
, passing two pyspark
@transformation
outputsβ
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@transformation(mode="pyspark")
def pick1(df1, df2):
return df1
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))
A pipeline
Feature View that calls a spark_sql
@transformation
, passing two pyspark
@transformation
outputsβ
@transformation(mode="pyspark")
def add_impression(input_df):
from pyspark.sql import functions as F
return input_df.withColumn("impression", F.lit(1))
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("content_id", "event_timestamp", "other_vals")
@transformation(mode="spark_sql")
def pick1(df1, df2):
return f"SELECT * FROM {df1}"
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return pick1(transform1(input_df), add_impression(input_df))
A str_split
implementation that uses two @transformation
sβ
In this example, we implement a generic str_split
transformation on a
specified column, followed by another transformation to calculate some summary
statistics for the feature.
Note that passing constants to a transformations requires using const
which
can be imported from tecton
.
from tecton import transformation, batch_feature_view, const, FilteredSource
from entities import auction
from data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime
@transformation(mode="spark_sql")
def str_split(input_data, column_to_split, new_column_name, delimiter):
return f"""
SELECT
*,
split({column_to_split}, {delimiter}) AS {new_column_name}
FROM {input_data}
"""
@transformation(mode="spark_sql")
def keyword_stats(input_data, keyword_column):
return f"""
SELECT
auction_id,
timestamp,
{keyword_column} AS keyword_list,
size({keyword_column}) AS num_keywords,
array_contains({keyword_column}, "bitcoin") AS keyword_contains_bitcoin
FROM {input_data}
"""
@batch_feature_view(
mode="pipeline",
sources=[FilteredSource(ad_impressions_batch)],
entities=[auction],
batch_schedule=timedelta(days=1),
online=True,
offline=True,
feature_start_time=datetime(2020, 5, 1),
ttl=timedelta(days=365),
)
def auction_keywords(ad_impressions):
split_keywords = str_split(ad_impressions, const("content_keyword"), const("keywords"), const("' '"))
return keyword_stats(split_keywords, const("keywords"))
FAQ: Why canβt I directly invoke PySpark methods on the output from a @transformation
?β
Transformations are intended to be interchangeable within a supported compute
environment. For Spark, this means mode="pyspark"
and 'mode="spark_sql"
transformations can be mixed. For example, this is a completely valid pipeline:
@transformation(mode="pyspark")
def transform1(input_df):
return input_df.select("a", "b", "c")
@transformation(mode="spark_sql")
def select_all(df):
return f"SELECT * FROM {df}"
@batch_feature_view(
mode="pipeline",
sources=[test_source],
entities=[content],
feature_start_time=datetime(2022, 2, 1),
timestamp_field="event_timestamp",
aggregation_interval=timedelta(days=1),
aggregations=[
Aggregation(column="other_vals", function="sum", time_window=timedelta(days=7)),
],
)
def fv(input_df):
return select_all(transform1(input_df))