tecton apply Runs Slowly for FileConfigs with Partitioned S3 Data
Scope
Configuring a BatchSource
for partitioned S3 data configured using a
FileConfig
. This applies to Parquet, CSV, or JSON files.
Cause
The likely cause of tecton apply
running slowly is Spark’s mechanism for
partition discovery, which makes it slow when inferring schemas for S3 prefixes
with many partitions. Every time a new data source is added, tecton apply
needs to invoke Spark to obtain the schema from a DataFrame.
For a partitioned data source, Spark will recurse through every partition nested beneath the S3 URI you provide. Read more about Spark partition discovery.
For example, suppose you have some parquet data in S3:
from tecton import FileConfig, BatchSource
transactions_ds = FileConfig(uri="s3://my-data/transactions/", file_format="parquet", timestamp_field="timestamp")
transactions_batch = BatchSource(name="transactions_batch", batch_config=transactions_ds)
For the data source above, Spark will recursively do a list operation on
every partition under s3://my-data/transactions/
. If, for example, the
data is partitioned by year, month, and day, this means 2 years of data will
require Spark do a list operation on 2 years x 12 months x approx 30 days =
approx 720 partitions. If the data is also partitioned by hour, this increases
to 720 x 24 = approx 17280 partitions.
You can verify this slow behavior by simply running spark.read followed by
printSchema()
in a notebook:
spark.read.parquet("s3://my-data/transactions/").printSchema()
Try doing this for the S3 prefix you’re finding slow. Be sure to replace
spark.read.parquet
with spark.read.json
or spark.read.csv
as needed.
Resolution
Option 1: Use an AWS Glue catalog table
Connect your Tecton instance to AWS Glue and create a Glue table that points to your data in S3. Since Spark is reading the data through a metastore, the schema will be instantly available to Spark without recursing through data in S3.
This option is recommended for any data that is depended upon in production use cases since Glue enforces a strong schema contract.
Option 2: Decrease the granularity of partitions
Do you need a high granularity of partitions for the volume of your data? If the total data volume is low, consider using fewer partitions. Fewer partitions will make all spark.read operations faster since there will be fewer directories to recurse through.
Option 3: Specify schema_uri to use a single file for schema inference
FileConfig
supports a schema_uri
field, which allows you to point to a
single file within your S3 data that will be used as the source of truth for its
schema. For example, suppose your data is located in
s3://my-data/transactions/
and it is partitioned by year, month, and day (e.g.
s3://my-data/transactions/year=2022/month=11/day=23/
).
Specifying a single file with schema_uri
will cause Spark to use it to infer
the data schema.
from tecton import FileConfig, BatchSource
SCHEMA_FILE_URI = "s3://my-data/transactions/year=2022/month=11/day=23/67d652.parquet"
transactions_ds = FileConfig(
uri="s3://my-data/transactions/",
schema_uri=SCHEMA_FILE_URI,
file_format="parquet",
timestamp_field="timestamp",
)
transactions_batch = BatchSource(name="transactions_batch", batch_config=transactions_ds)
Internally, this causes Spark to infer schema using the “basePath” option which tells Spark to treat the partition subpaths between “basePath” and the specified URI as schema columns.
spark.read \
.option("basePath", s3://my-data/transactions/") \
.parquet("s3://my-data/transactions/year=2022/month=11/day=23/67d652.parquet")