Using Third Party Dependencies in PySpark Transformations
Overview​
You can use third party Python packages in UDFs used by PySpark transformations
(mode="pyspark"
) by declaring them in extra_pip_dependencies
within the
cluster config object, that is either tecton.EMRClusterConfig
or
tecton.DatabricksClusterConfig
. An example follows:
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
In extra_pip_dependencies
, you can also specify Amazon S3 URIs, to import your
own modules. Ensure that the Tecton Spark IAM role or your Tecton Databricks
instance profile has access to read from this S3 URI.
To use your own module, package it as a Python wheel or egg file in S3.
Using a third party dependency inside of a Feature View transformation​
In this example we're using an imported function log1p
from Tensorflow in a
UDF that is defined inside of a PySpark Feature View transformation.
Note that the import statement has to be within the UDF body; placing it at the top of the file or inside transformation function will not work.
from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, FilteredSource
from shared import entities, data_sources
new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)
@batch_feature_view(
mode="pyspark",
sources=[FilteredSource(source=user_clicks)],
entities=[entities.product],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
batch_compute=new_emr,
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
def my_udf(x):
import tensorflow as tf
return int(tf.math.log1p(float(x)).numpy())
extra_udf = F.udf(my_udf, LongType())
return user_clicks.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)