pandas_batch_config
Summary​
Declare a PandasBatchConfig
for
configuring a Batch Source with a Data Source Function. The function takes in an
optional FilterContext
, if
supports_time_filtering=True
. Returns a pandas.DataFrame
.
Parameters​
-
data_delay
(Optional
[timedelta
]) – By default, incremental materialization jobs run immediately at the end of the batch schedule period. This parameter configures how long they wait after the end of the period before starting, typically to ensure that all data has landed. For example, if a feature view has abatch_schedule
of 1 day and one of the data source inputs hasdata_delay=timedelta(hours=1)
set, then incremental materialization jobs will run at01:00
UTC. (Default:datetime.timedelta(0)
) -
supports_time_filtering
(bool
) – Must be set to toTrue
if one of the following conditions is met:<data source>.get_dataframe()
is called withstart_time
orend_time
- A feature view wraps this Data Source with a
FilteredSource
If this parameter is set to true, Tecton passes a
FilterContext
object into the Data Source Function, which is expect to handle its own filtering. (Default:False
) -
secrets
Optional
[Dict
[str
,Union
[Secret
,str
]]]- A dictionary of Secret references that will be resolved and provided to the Data Source Function at runtime. During local development and testing, strings may be used instead Secret references.
Example​
The following example shows how to use pandas_batch_config
to read from a file
in S3. This example uses Tecton Secrets
to securely authenticate to a data source that requires a password or API key.
If your data source supports AWS IAM authentication, then you can instead
leverage a similar to the one shown
here.
from tecton import pandas_batch_config, Secret
@pandas_batch_config(
secrets={
"aws_access_key": Secret(scope="dev", key="aws_access_key"),
"aws_secret_key": Secret(scope="dev", key="aws_secret_key"),
}
)
def parquet_data_source_function(secrets):
import pyarrow.parquet as pq
from pyarrow.fs import S3FileSystem
# Suppose you need to pass a specific key-pair to access a bucket
s3fs = S3FileSystem(access_key=secrets["aws_access_key"], secret_key=secrets["aws_secret_key"])
dataset = pq.read_table(f"s3://bucket/pa../../../data.pq", filesystem=s3fs).to_pandas()
return dataset