Kafka Producer
The Kafka Producer allows you to publish messages in near-real-time across worker nodes where multiple, subscribed members have access. A Kafka Producer step publishes a stream of records to one Kafka topic.
General
Enter the following information in the transformation step name field.
- Step name: Specifies the unique name of the transformation on the canvas. The Step Name is set to Kafka Producer by default.
Options
The Kafka Producer step features a Kafka connection Setup tab and a Configuration properties tab. Each tab is described below.
Setup tab
Fill in the following fields.
Option | Description |
Connection |
Select a connection type:
|
Client ID | The unique Client identifier, used to identify and set up a durable connection path to the server to make requests and to distinguish between different clients. |
Topic | The category to which records are published. |
Key Field | In Kafka, all messages can be keyed, allowing for messages to be distributed to partitions based on their keys in a default routing scheme. If no key is present, messages are randomly distributed to partitions. |
Message Field | The individual record contained in a topic. |
Options tab
Use this tab to configure properties for the Kafka Producer broker sources. The property values can be encrypted. You can use PDI environment variables, kettle.properties variables, and parameter options for the property values. For further information on these input names, see the Apache Kafka documentation site: https://kafka.apache.org/documentation/.
Security
You can implement security for the Pentaho Kafka Producer step can be implemented using either an SSL, SASL, or SASL connection.
Using SSL
Procedure
On the Setup tab, select the Direct connection and enter ${KAFKA_ssl_url} as the Bootstrap servers URL.
On the Options tab, enter the options and values listed in the following table:
Option Value compression.type
none
ssl.truststore.location
$[Path to Trust store]
ssl.truststore.password
$[Password]
ssl.keystore.location
$[Path to Key store]
ssl.keystore.password
$[ Key store password]
ssl.key.password
$[ Key password]
security.protocol
SSL
ssl.protocol
TLS 1.2
Click OK.
Using SASL
SASL security requires the Kerberos configuration file krb5.conf and a Kerberos principal. You must obtain these from your Kerberos administrator.
Perform the following steps to set up SASL security for PDI to connect to the Kafka broker:
Procedure
Copy the krb5.conf file to the ${JAVA_HOME}/conf/security directory.
Run the kinit command
${KERBEROS_PRINCIPAL_KAFKA}
to initiate the authentication process to obtain a Kerberos ticket-granting ticket (TGT).Copy the ${KERBEROS_PRINCIPAL_KAFKA}.keytab from the server to the workstation where PDI is installed.
On the Setup tab, select the Direct connection and enter ${KAFKA_SASL_PLAINTEXT_URL} as the Bootstrap servers URL.
On the Options tab, enter the options and values listed in the following table:
Option Value compression.type
none
security.protocol
SASL_PLAINTEXT
sasl.mechanism
GSSAPI
sasl.kerberos.service.name
${KERBEROS_KAFKA_SERVICE_NAME}
sasl.jaas.config
${SASL_JAAS_CONFIG}
Click OK.
Next steps
${SASL_JAAS_CONFIG}
com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true doNotPrompt=true keyTab="Path to ${KERBEROS_PRINCIPAL_KAFKA}.keytab" principal="${Pricipal created in Kerberos for Kafka}";
Using SASL SSL
Procedure
On the Setup tab, select the Direct connection and use ${KAFKA_KERBEROS_SSL_URL} as the URL.
On the Options tab, enter the options and values listed in the following table:
Option Value compression.type
none
security.protocol
SASL_SSL
sasl.mechanism
PLAIN
sasl.kerberos.service.name
${KERBEROS_KAFKA_SERVICE_NAME}
sasl.jaas.config
${SASL_JAAS_CONFIG}
ssl.truststore.location
$[Path to Trust store]
ssl.truststore.password
$[Password]
ssl.keystore.location
$[Path to Key store]
ssl.keystore.password
$[ Key store password]
ssl.key.password
$[ Key password]
ssl.protocol
TLS 1.2
Click OK
Next steps
Metadata injection support
All fields of this step support metadata injection. You can use this step with ETL metadata injection to pass metadata to your transformation at runtime.