Connecting to a Kafka Stream Data Source
Tecton can use Kafka as a data source for feature materialization. Connecting to Kafka requires setting up authentication and, if using Amazon MSK, establishing Virtual Private Cloud (VPC) connectivity.
Establish network connectivity​
For Amazon MSK​
Because your data platform (Databricks or EMR) resides on a different Amazon VPC than the VPC where Amazon MSK resides, you will need to configure access between the two VPCs.
For Confluent​
See the Confluent documentation that explains the available options for establishing network connectivity.
Configure authentication​
Tecton can connect to Kafka using TLS and SASL.
TLS authentication​
Configuring authentication with SSL requires configuring a keystore, and optionally, a truststore.
Set up a keystore and truststore​
To set up a keystore, upload your keystore file (a .jks
file) to either S3 or
DBFS (Databricks only) and set the Tecton ssl_keystore_location
parameter of a
KafkaConfig
object.
If accessing the keystore file requires a password, follow these steps, based on your data platform:
Databricks​
- Create a secret using the scope you created in
Connecting Databricks.
The secret key name must begin with
SECRET_
. - In your
KafkaConfig
object, setssl_keystore_password_secret_id
to the secret key name you created in the first step.
EMR​
- In AWS Secrets Manager, create a secret key having the format:
<prefix>/SECRET_<rest of the secret name>
where<prefix>
is:<deployment name>
, if your deployment name begins withtecton
tecton-<deployment name>
, otherwise and<deployment name>
is the first part of the URL used to access the Tecton UI:https://<deployment name>.tecton.ai
and<rest of the secret name>
is a string of your choice.
- In your
KafkaConfig
object, setssl_keystore_password_secret_id
to the secret key name you created in the first step.
To set up a truststore, upload your truststore file (a .jks
file) to either S3
or DBFS (Databricks only) and set the Tecton ssl_keystore_location
parameter
of a KafkaConfig
object.
If accessing the truststore file requires a password, follow these steps, based on your data platform:
Databricks​
-
Create a secret using the scope you created in Connecting Databricks. The secret key name must begin with
SECRET_
. -
In your
KafkaConfig
object, setssl_truststore_password_secret_id
to the secret key name you created in the first step.
EMR​
-
In AWS Secrets Manager, create a secret key having the format:
<prefix>/SECRET_<rest of the secret name>
where
<prefix>
is:<deployment name>
, if your deployment name begins withtecton
tecton-<deployment name>
, otherwise
and
<deployment name>
is the first part of the URL used to access the Tecton UI:https://<deployment name>.tecton.ai
and
<rest of the secret name>
is a string of your choice. -
In your
KafkaConfig
object, setssl_truststore_password_secret_id
to the secret key name you created in the first step.
In the following cases, use of a truststore is required.
- Kafka configured with a custom Certificate Authority
- Using Amazon MSK with Databricks
In many other cases, use of a truststore is optional.
Databricks: Notes on keystore/truststore files (Databricks)​
You can store keystore and truststore files in any location. The following are
example locations (these are set as parameters in the KafkaConfig
object):
ssl_keystore_location
:dbfs:/kafka-credentials/kafka_client_keystore.jks
ssl_truststore_location
:dbfs:/kafka-credentials/amazon_truststore.jks
EMR: Notes on keystore/truststore files (EMR)​
You must store your keystore and/or truststore files in the location
s3://tecton-<deployment name>/kafka-credentials
.
Here, <deployment name>
is your deployment name, and is the hostname of your
Tecton Web UI; If your cluster is mycompany.tecton.ai
, then
<deployment name>
is mycompany
.
At materialization time, Tecton will download all of the files in this location to the materialization cluster.
The ssl_keystore_location
and ssl_truststore_location
parameters in the
KafkaConfig
object are as follows:
ssl_keystore_location
:s3://tecton-<deployment name>/kafka-credentials/kafka_client_keystore.jks
ssl_truststore_location
:s3://tecton-<deployment name>/kafka-credentials/amazon_truststore.jks
You will also need to add the following bootstrap script to your cluster:
s3://tecton.ai.public/install_scripts/setup_emr_notebook_cluster_copy_kafka_credentials.sh
.
Pass the following argument as input: s3://tecton-<deployment name>
. The
script will look in the /kafka-credentials
folder inside that S3 bucket.
Resources​
The following resources are helpful in understanding how to configure TLS.
-
Encrypt and Authenticate with TLS in the Confluent documentation.
-
Mutual TLS Authentication in the Amazon documentation.
Specify your Kafka configuration in Tecton​
The following KafkaConfig
definition shows the parameters that need to be set
to configure a Tecton connection to Kafka using TLS.
ssl_keystore_password_secret_id
, ssl_truststore_location
and
ssl_truststore_password_secret_id
may not be required; see
Set up a keystore and truststore, above.
stream_config = KafkaConfig(
kafka_bootstrap_servers=os.environ["bootstrap_servers"],
topics="<Kafka topic name(s)>",
ssl_keystore_location=os.environ["ssl_keystore_location"],
ssl_keystore_password_secret_id=os.environ["ssl_keystore_password_secret_id"],
ssl_truststore_location=os.environ["ssl_truststore_location"],
ssl_truststore_password_secret_id=os.environ["ssl_truststore_password_secret_id"],
security_protocol="SSL",
# <other parameters>
)
SASL authentication​
A Tecton connection to Kafka using SASL is configured using Java Authentication and Authorization Service (JAAS) settings.
For information on JAAS and its supported SASL authentication mechanisms, see this page in the Confluent documentation.
The following KafkaConfig
definition shows the parameters that need to be set
to configure a Tecton connection to Kafka using the SASL PLAIN
mechanism. To
use a different connection mechanism, you would modify the settings in the
kafka_settings()
function.
def get_sasl_username():
return os.environ["sasl_username"]
def get_sasl_password():
return os.environ["sasl_password"]
def get_kafka_settings():
return {
"kafka.sasl.jaas.config": (
f"org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="' + get_sasl_username() + '" password="' + get_sasl_password() + '";'
),
"kafka.sasl.mechanism": "PLAIN",
}
stream_config = KafkaConfig(
kafka_bootstrap_servers=os.environ["bootstrap_servers"],
topics="<Kafka topic name(s)>",
options=get_kafka_settings(),
security_protocol="SASL_SSL",
)