tecton.FileConfig
Summaryβ
Configuration used to reference a file or directory (S3, etc.)
The FileConfig class is used to create a reference to a file or directory of files in S3, HDFS, or DBFS.
The schema of the data source is inferred from the underlying file(s). It can
also be modified using the post_processor
parameter.
This class is used as an input to a
BatchSource
βs parameter
batch_config
. Declaring this configuration class alone will not register a
Data Source. Instead, declare as a part of BatchSource
that takes this
configuration class instance as a parameter.
Exampleβ
from tecton import FileConfig, BatchSource
def convert_temperature(df):
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType
# Convert the incoming PySpark DataFrame temperature Celsius to Fahrenheit
udf_convert = udf(lambda x: x * 1.8 + 32.0, DoubleType())
converted_df = df.withColumn("Fahrenheit", udf_convert(col("Temperature"))).drop("Temperature")
return converted_df
# declare a FileConfig, which can be used as a parameter to a `BatchSource`
ad_impressions_file_ds = FileConfig(
uri="s3://tecton.ai.public/data/ad_impressions_sample.parquet",
file_format="parquet",
timestamp_field="timestamp",
post_processor=convert_temperature,
)
# This FileConfig can then be included as an parameter a BatchSource declaration.
# For example,
ad_impressions_batch = BatchSource(name="ad_impressions_batch", batch_config=ad_impressions_file_ds)
If your files are partitioned, simply provide the path to the root folder. For
example: uri = "s3://<bucket-name>/<root-folder>/"
The FileConfig
class will use
Spark partition discovery
to find all partitions and infer the schema.
When reading a highly-partitioned file, Tecton recommends setting the
schema_uri
parameter to speed up schema inference. For more details, review
our documentation
here.
Attributesβ
data_delay
: This attribute is the same as thedata_delay
parameter of the__init__
method. See below.
Methodsβ
__init__(...)β
Instantiates a new FileConfig.
Parametersβ
-
uri
(str
) β S3, HDFS, or DBFS path to file(s). -
file_format
(str
) β File format. Possible values arejson
,parquet
, orcsv
-
timestamp_field
(Optional
[str
]) β The timestamp column in this data source that should be used by FilteredSource to filter data from this source, before any feature view transformations are applied. Only required if this source is used with FilteredSource. (Default:None
) -
timestamp_format
(Optional
[str
]) β Format of string-encoded timestamp column (e.g. βyyyy-MM-ddβTβhh:mm:ss.SSSβZββ). If the timestamp string cannot be parsed with this format, Tecton will fallback and attempt to use the default timestamp parser. (Default:None
) -
post_processor
(Optional
[Callable
]) β Python user defined functionf(DataFrame) -> DataFrame
that takes in raw Pyspark data source DataFrame and translates it to the DataFrame to be consumed by the Feature View. (Default:None
) -
schema_uri
(Optional
[str
]) β A file or subpath of βuriβ that can be used for fast schema inference. This is useful for speeding up plan computation for highly partitioned data sources containing many files. (Default:None
) -
schema_override
(Optional
[StructType
]) β A pyspark.sql.types.StructType object that will be used as the schema when reading from the file. If omitted, the schema will be inferred automatically. (Default:None
) -
data_delay
(timedelta
) β By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has a batch_schedule of 1 day and one of the data source inputs hasdata_delay=timedelta(hours=1)
set, then incremental materialization jobs will run at 01:00 UTC. (Default:datetime.timedelta(0)
)
Returnsβ
A FileConfig class instance.