Creating Data Sources
Unless noted otherwise, code in this tutorial is to be pasted into and run in a notebook.
In this topic, you will create two data sources that will be used by the features that you create later.
A data source refers to a Tecton object (e.g. a BatchSource
or StreamSource
)
that references an external data source such as a Hive table, CSV file, or
Kinesis stream.
In the general case, a Tecton data source maps to a single external table or stream.
Read data from the external data source​
In this tutorial we will create two Batch Sources:
transactions
: Contains facts about historical customer transactionscustomers
: Contains information about customers such as their name, city, and address
These sources will reference parquet files from a public S3 bucket that Tecton manages.
Let's first verify that you can read data from the transactions
and
customers
files from your notebook:
Read from the transactions file​
spark.read.parquet("s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq").show()
Example output:
user_id | transaction_id | category | amt | is_fraud | merchant | merch_lat | merch_long | timestamp |
---|---|---|---|---|---|---|---|---|
user_884240387242 | 80883eb88afb219c9... | gas_transport | 68.23 | 0 | fraud_Kutch, Herm... | 42.710006 | -78.338644 | 2023-06-20 10:26:41 |
user_268514844966 | 5fc672e23b9193f97... | misc_pos | 32.98 | 0 | fraud_Lehner, Rei... | 39.153572 | -122.36427 | 2023-06-20 12:57:20 |
user_722584453020 | 01bddb7a41ce2d16a... | home | 4.5 | 0 | fraud_Koss, Hanse... | 33.033236 | -105.7457 | 2023-06-20 14:49:59 |
Read from the customers file​
spark.read.parquet("s3://tecton.ai.public/tutorials/fraud_demo/customers/data.pq").show()
Example output:
ssn | cc_num | first | last | gender | street | city | state | zip | lat | long | city_pop | job | dob | user_id | signup_timestamp |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
647-66-4497 | 4979481248514730 | Amanda | Brown | F | 3071 Barnes Alley | Minneapolis | MN | 55447 | 45.0033 | -93.4875 | 1022298 | Restaurant manage... | 2003-02-27 | user_709462196403 | 2017-04-06 00:50:31 |
156-89-3580 | 6011823734714909 | Jessica | Smith | F | 572 Jennifer Manor | Portage | MI | 49002 | 42.1938 | -85.5639 | 47338 | Publishing rights... | 1989-07-30 | user_687958452057 | 2017-05-08 16:07:51 |
777-29-0872 | 213115913848502 | Anthony | Bishop | M | 890 James Orchard... | Edgewood | IL | 62426 | 38.9021 | -88.6645 | 1085 | Technical sales e... | 1990-07-30 | user_884240387242 | 2017-06-15 19:33:18 |
Define a BatchSource for the transactions file​
In your local feature repository, open the data_sources/transactions.py
file.
Then uncomment the following code and save the file:
from tecton import BatchSource, FileConfig
transactions = BatchSource(
name="transactions",
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/fraud_demo/transactions/data.pq",
file_format="parquet",
timestamp_field="timestamp",
),
)
Register the transactions data source​
In your terminal, run tecton apply
to register the new data source with the
workspace you created and selected during
setup.
tecton apply
You should see the following output:
✅ Collecting local feature declarations
✅ Performing server-side feature validation: : Initializing.
↓↓↓↓↓↓↓↓↓↓↓↓ Plan Start ↓↓↓↓↓↓↓↓↓↓
+ Create BatchDataSource
name: transactions
↑↑↑↑↑↑↑↑↑↑↑↑ Plan End ↑↑↑↑↑↑↑↑↑↑↑↑
Generated plan ID is <plan ID>
View your plan in the Web UI: <Web UI URL>
Are you sure you want to apply this plan to: "<workspace name>"? [y/N]>
Hit y
to apply your new data source definition.
If you navigate to your Tecton Web UI and select your new workspace, you will now see this data source has been registered.
You can view the plan of potential changes to a workspace using tecton plan
.
tecton apply
will first show you a plan before asking if you want to apply the
changes.
You can list the workspaces in your account using tecton workspace list
.
You can select a given workspace to apply to using
tecton workspace select <workspace-name>
Test the transactions data source​
Now that the data source has been defined, we want to make sure we can read from it through Tecton.
First get the transactions
data source from the workspace and then call the
get_dataframe()
method to retrieve data.
transactions_ds = ws.get_data_source("transactions")
transactions_df = transactions_ds.get_dataframe(
start_time=datetime(2022, 1, 1), end_time=datetime(2022, 2, 1)
).to_spark()
transactions_df.show()
Example output:
user_id | transaction_id | category | amt | is_fraud | merchant | merch_lat | merch_long | timestamp |
---|---|---|---|---|---|---|---|---|
user_26990816968 | 5c592b874a917729eb78360be126509b | grocery_pos | 111.22 | 0 | fraud_Kiehn Inc | 33.1858 | -91.5272 | 2022-01-01 01:49:15 |
user_871233292771 | 934f4ee1c7d43b6ae1b6ce8359207e0c | personal_care | 4.58 | 0 | fraud_Hahn, Bahringer and McLaughlin | 38.5728 | -83.6879 | 2022-01-01 02:08:46 |
user_650387977076 | d6ad03866e795a8bf9f73a38820a27e1 | grocery_pos | 52.88 | 0 | fraud_Heidenreich PLC | 36.2717 | -122.197 | 2022-01-01 02:27:18 |
get_dataframe()
has two optional parameters: start_time
and end_time
,
which are used to filter the data that is read from the source. These parameters
depend on the timestamp_field
(and partition_columns
in the case of a Hive
source) in order to do the filtering.
Not using a start_time
and end_time
with a timestamp_field
set on the data
source may result in slow queries for large data sources.
All Tecton methods which return a DataFrame return a "Tecton DataFrame" which
can be converted to platform-specific DataFrame types using to_pandas()
or
to_spark()
.
Define a BatchSource for the customers file​
In your local feature repository, open the data_sources/customers.py
file.
Then uncomment the following code and save the file:
from tecton import BatchSource, FileConfig
customers = BatchSource(
name="customers",
batch_config=FileConfig(
uri="s3://tecton.ai.public/tutorials/fraud_demo/customers/data.pq",
file_format="parquet",
timestamp_field="signup_timestamp",
),
)
Register the customers data source​
In your terminal, run tecton apply
to register the new data source.
tecton apply
Test the customers data source​
Use the get_dataframe()
method again to retrieve data.
customers_ds = ws.get_data_source("customers")
customers_df = customers_ds.get_dataframe(start_time=datetime(2017, 1, 1), end_time=datetime(2022, 1, 1)).to_spark()
customers_df.show()
Example output:
ssn | cc_num | first | last | gender | street | city | state | zip | lat | long | city_pop | job | dob | user_id | signup_timestamp |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
647-66-4497 | 4979481248514730 | Amanda | Brown | F | 3071 Barnes Alley | Minneapolis | MN | 55447 | 45.0033 | -93.4875 | 1022298 | Restaurant manager, fast food | 2003-02-27 | user_709462196403 | 2017-04-06 00:50:31 |
156-89-3580 | 6011823734714909 | Jessica | Smith | F | 572 Jennifer Manor | Portage | MI | 49002 | 42.1938 | -85.5639 | 47338 | Publishing rights manager | 1989-07-30 | user_687958452057 | 2017-05-08 16:07:51 |
777-29-0872 | 213115913848502 | Anthony | Bishop | M | 890 James Orchard Suite 993 | Edgewood | IL | 62426 | 38.9021 | -88.6645 | 1085 | Technical sales engineer | 1990-07-30 | user_884240387242 | 2017-06-15 19:33:18 |