Using the MQTT Consumer step on the Spark engine
You can set up the MQTT Consumer step to run on the Spark engine. If your MQTT is configured to accept SSL connections, you need to adjust the AEL daemon application.properties file for SSL keystore. For more information, see Using MQTT with SSL on AEL
General
The MQTT Consumer step requires Setup, Security, Batch, Fields, Result fields, and Options definitions to stream messages.

Enter the following information for the Step Name and Transformation fields:
Option | Description |
Step name | Specifies the unique name of the step on the canvas. The Step name is set to MQTT Consumer by default. |
Transformation |
Specify the child transformation to execute by performing any of the following actions:
If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path. For example, if the current transformation's path is: /home/admin/transformation.ktr and you select a transformation in the directory /home/admin/path/sub.ktr, then the path is automatically converted to: ${Internal.Entry.Current.Directory}/path/sub.ktr If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation. Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository. |
Create and save a new child transformation
Procedure
In the MQTT Consumer step, click New. The Save As dialog box appears.
Navigate to the location where you want to save your new child transformation, then type in the file name.
Click Save.
A notification box displays informing you that the child transformation has been created and opened in a new tab. If you do not want to see this notification in the future, select the Don't show me this again check box.Click the new transformation tab to view and edit the child transformation.
It automatically contains the Get Records from Stream step. (Optional) You can continue to build this transformation and save it.When finished, return to the MQTT Consumer step.
Options
The MQTT Consumer step includes several tabs. Each tab is described below.
Setup tab

In the Setup tab, define the connections used for receiving messages, topics to which you want to subscribe, and the consumer group for the topics.
Option | Description |
Connection | Specify the address of the MQTT server to which this step will connect for sending or retrieving messages. |
Client ID | Specify a unique ID for the MQTT client. The MQTT server uses this Client ID to recognize each distinct client and that client's current state. |
Topics | Specify the MQTT topic(s) to be subscribed to. |
Quality of Service (QoS) |
Quality of Service (QoS) is a level of guarantee for message delivery. Select one of the following options.
|
Security tab

The Security tab allows you to define authentication credentials for the MQTT server. This tab includes the following options:
Option | Description |
Username | Specify the user name required to access the MQTT server. |
Password | Specify the password associated with the Username. |
Use secure protocol |
Select this option if you want to define SSL properties for the connection. This security protocol setting is used only on Kettle. It is not used on AEL Spark. |
SSL Properties |
|
Batch tab

Use this tab to designate how many messages to consume before processing. You can specify message count and/or a specific amount of time.
How many messages consumed before processing is defined by either the Duration (ms) or the Number of records option. Messages are consumed when either the specified duration or number of records occur. For example, if Duration (ms) is set to 1000 milliseconds and Number of records is 1000, messages are consumed for processing whenever time intervals of 1000 milliseconds are reached or 1000 records have been received. If you set either option to zero, PDI will ignore that parameter.
You can also specify the maximum number of batches used to collect records at the same time.
Option | Description |
Duration (ms) | Specify a time in milliseconds. This value is the amount of time
the step will spend collecting records prior to the execution of the
transformation. If this option set to a value of 0, then Number of records triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation. NoteYou must set this field if you are
using Spark as your processing engine. |
Number of records | Specify a number. After every ‘X’ number of records, the specified
transformation will be executed and these ‘X’ records will be passed to the
transformation. If this option set to a value of 0 then Duration triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation. |
Maximum concurrent batches | Specify the maximum number of batches used to collect records at the same time.
The default value is 1, which indicates a single
batch is used for collecting records. This option should only be used when your consumer step cannot keep pace with the speed at which the data is streaming. Your computing environment must have adequate CPU and memory for this implementation. An error will occur if your environment cannot handle the maximum number of concurrent batches specified. |
Message prefetch limit | Specify a limit for how many incoming messages this step will queue for processing, as they are received from the broker. Setting this value forces the broker to manage the backpressure of messages exceeding the specified limit. The default number of messages to queue is 100000. |
Fields tab

Use this tab to define the fields in the record format.
Option | Description |
Input Name |
The input name is received from the MQTT streams. The following are received by default:
|
Output Name | The Output Name can be mapped to subscriber and member requirements. |
Type | This will always be a String. This field applies to the Message and Topic input names. |
Result fields tab

Use this tab to select the step from the child transformation that will stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in the parent transformation to be passed downstream to any other steps included within the same parent transformation.
Option | Description |
Return fields from: | Select the name of the step (from the child transformation) that will stream fields back to the parent transformation. The data values in these returned fields will be available to any subsequent downstream steps in the parent transformation. |
Options tab

The Options tab includes the following MQTT-specific parameters:
Parameter | Description |
Keep Alive Interval | Specify a maximum number of interval seconds that is permitted to elapse between the point at which the PDI client finishes transmitting one Control Packet and the point it starts sending the next. |
Max Inflight | Specify a number for the maximum number of messages to have in process at any given time. |
Connection Timeout | Specify the time, in seconds, to disconnect if a message is not received. |
Clean Session |
Specify if the broker will store or purge messages for a session. Select one of the following.
|
Storage Level |
Indicates if messages are stored in memory or on disk.
This setting is used only on Kettle. It is not used on AEL Spark, which uses its own configuration. |
Server URIs | Specify the MQTT server’s universal resource identifier (URI). |
MQTT Version | Specify the MQTT protocol version that this step is connecting to. |
Automatic Reconnect |
Enables the client to attempt an automatic re-connect to the server if it becomes disconnected. Select True or False:
|
Using MQTT with SSL on AEL
Procedure
Navigate to the data-integration/adaptive-execution/config directory and open the application.properties file with any text editor.
Add the following parameters.
Add the DriverExtraJavaOptions parameter for the Spark driver application and specify the file path for client.ks and its associated password on the cluster:
sparkDriverExtraJavaOptions=-Dlog4j.configuration=file:${sparkApp}/classes/log4j.xml -Djavax.net.ssl.trustStore=[filepath to client.ks on the cluster] -Djavax.net.ssl.trustStorePassword=password
sparkDriverExtraJavaOptions=-Dlog4j.configuration=file:${sparkApp}/classes/log4j.xml -Djavax.net.ssl.trustStore=/home/cloudera/client.ks -Djavax.net.ssl.trustStorePassword=password
Add the ExtraJavaOptions parameter for the Spark executors and specify the file path for client.ks and its password on the cluster:
sparkExecutorExtraJavaOptions=-Djavax.net.ssl.trustStore=[filepath to client.ks on the cluster] -Djavax.net.ssl.trustStorePassword=password
sparkExecutorExtraJavaOptions=-Djavax.net.ssl.trustStore=/home/cloudera/client.ks -Djavax.net.ssl.trustStorePassword=password
Save and close the file.
Navigate to the data-integration/adaptive-execution folder and run the daemon.sh command from the command line interface.
Metadata injection support
All fields of this step support metadata injection. You can use this step with ETL metadata injection to pass metadata to your transformation at runtime.