Skip to main content
Version: 0.6

Using Third Party Dependencies in PySpark Transformations

Overview​

You can use third party Python packages in UDFs used by PySpark transformations (mode="pyspark") by declaring them in extra_pip_dependencies within the cluster config object, that is either tecton.EMRClusterConfig or tecton.DatabricksClusterConfig. An example follows:

new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)

In extra_pip_dependencies, you can also specify Amazon S3 URIs, to import your own modules. Ensure that the Tecton Spark IAM role or your Tecton Databricks instance profile has access to read from this S3 URI.

To use your own module, package it as a Python wheel or egg file in S3.

Using a third party dependency inside of a Feature View transformation​

In this example we're using an imported function log1p from Tensorflow in a UDF that is defined inside of a PySpark Feature View transformation.

Note that the import statement has to be within the UDF body; placing it at the top of the file or inside transformation function will not work.

from datetime import datetime, timedelta
from tecton import batch_feature_view, EMRClusterConfig, FilteredSource
from shared import entities, data_sources

new_emr = EMRClusterConfig(
instance_type="m4.xlarge",
number_of_workers=4,
extra_pip_dependencies=["tensorflow==2.2.0"],
)


@batch_feature_view(
mode="pyspark",
sources=[FilteredSource(source=user_clicks)],
entities=[entities.product],
online=False,
offline=False,
feature_start_time=datetime(year=2021, month=1, day=1),
batch_schedule=timedelta(days=1),
ttl=timedelta(days=30),
batch_compute=new_emr,
)
def my_example_feature_view(user_clicks):
from pyspark.sql import functions as F
from pyspark.sql.types import LongType

def my_udf(x):
import tensorflow as tf

return int(tf.math.log1p(float(x)).numpy())

extra_udf = F.udf(my_udf, LongType())

return user_clicks.select(
"product_id",
extra_udf("clicked").alias("log1p_clicked"),
F.to_utc_timestamp(F.col("timestamp"), "UTC").alias("timestamp"),
)

Was this page helpful?

🧠 Hi! Ask me anything about Tecton!

Floating button icon