JMS Consumer
Use the JMS Consumer step to receive streaming data from the Apache ActiveMQ Java Messaging Service (JMS) server or the IBM MQ middleware.
The parent JMS Consumer step runs a child (sub-transformation) that executes according to the message batch size or duration, letting you process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step. You can configure the JMS Consumer step to continuously ingest streaming data from your JMS server.
In the JMS Consumer step itself, you can define the number of messages to accept for processing, as well as the specific data formats to stream activity data and system metrics. You can set up this step to collect monitored events, track user consumption of data streams, and monitor alerts. Additionally, you can select a step in the child transformation to stream records back to the parent transformation, which passes records downstream to any other steps included within the same parent transformation.
Before you begin
Before using the JMS Consumer step, be aware of the following conditions:
- You must be familiar with JMS messaging to use this step. Additionally, you must have a message broker, such as Apache ActiveMQ or IBM MQ, available before you configure this step.
- This step supports JMS 2.0 and requires Apache ActiveMQ Artemis.
- If you need to use JMS 1.1 with ActiveMQ or Artemis, use the previous versions of the JMS Consumer and JMS Producer steps, also available in Pentaho version 8.1 and earlier.
- Place IBM MQ client JARs for the IBM MQ middleware in the following directories:
- On the PDI client: data-integration/system/karaf/deploy
- On the Pentaho Server: server/pentaho-server/pentaho-solutions/system/karaf/deploy
- com.ibm.mq.osgi.allclientprereqs_9.0.0.3.jar
- com.ibm.mq.osgi.allclient_9.0.0.3.jar
- Place JMS Library jars for the ConnectionFactory and other
supporting classes in the following directories:
- On the PDI client: data-integration/system/karaf/deploy
- On the Pentaho Server: server/pentaho-server/pentaho-solutions/system/karaf/deploy
General
The JMS Consumer step requires definitions for setup, batch, field, and result fields to stream messages.
Enter the following information in the transformation step fields.
Option | Description |
Step name | Specifies the unique name of the step on the canvas. The Step name is set to JMS Consumer by default. |
Transformation | Specify the child transformation to execute by performing any of
the following actions:
Note that the selected child transformation must start with the Get Records from Stream step. If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path. For example, if the current transformation's path is /home/admin/transformation.ktr and you select a transformation in the directory /home/admin/path/sub.ktr, then the path is automatically converted to ${Internal.Entry.Current.Directory}/path/sub.ktr. If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation. Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository. |
Create and save a new child transformation
Procedure
In the JMS Consumer step, click New.
The Save As dialog box appears.Navigate to the location where you want to save your new child transformation, then type in the file name.
Click Save.
A notification box displays informing you that the child transformation has been created and opened in a new tab. If you do not want to see this notification in the future, select the Don't show me this again check box.Click the new transformation tab to view and edit the child transformation.
It automatically contains the Get Records from Stream step. Optionally, you can continue to build this transformation and save it.When finished, return to the JMS Consumer step.
Options
The JMS Consumer step features several tabs. Each tab is described below.
Setup tab
In this tab, define the connections used for receiving messages, topics, and queues.
Option | Description |
IBM MQ | Activate this connection type if you are using IBM MQ as your message broker. |
ActiveMQ | Activate this connection type if you are using Apache ActiveMQ Artemis and JMS 2.0 as your message broker. |
JMS URL | Enter the broker URL for the selected connection type. |
Destination type | Select Topic or
Queue from the list to specify the delivery model you
want to use.
|
Destination name | Specify the name of the topic or queue. |
Receive timeout | Specify the time to wait for incoming messages in milliseconds.
Note that a timeout setting of zero (0) never expires. |
Security tab
The Security tab includes the following authentication options and values:
Option | Description |
Username | Specify the user name required to access the Active MQ or IBM MQ server. |
Password | Specify the password associated with the user name. |
Use secure protocol |
Select to secure the message stream with the Secure Socket Layer (SSL) protocol. You can adjust the settings of the protocol through the SSL properties table. |
The following SSL values are available, depending on whether you use ActiveMQ or IBM MQ as the connection method for the step:
Name | Value |
Ciphersuite | Specify a CipherSuite name. Values depend on the provider. For more details, see: https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunProviders.html |
Context Algorithm | Specify the name of the secure protocol you are using. |
FIPS required | IBM MQ only: Specify True to enable support for the Federal Information Processing Standard (FIPS). Specify False if FIPS is not required. |
Key Store Password | Specify the password for the key store object you want this secure connection to use. |
Key Store Path | Specify the file path location of the key store you want this secure connection to use. |
Key Store Type | IBM MQ only: Specify the format of the key store. |
SSL Provider | Active MQ only: Specify the SSL implementation you want to use,
either JDK or OpenSSL. If you specify OpenSSL, you will need to provide the OpenSSL libraries. For details, see the ActiveMQ documentation: https://activemq.apache.org/artemis/docs/latest/configuring-transports.html |
Trust Store Password | IBM MQ only: Specify the password for the trust store object you want this secure connection to use. |
Trust All | Active MQ only: Specify either True or False. False is recommended. Specify True if you want this connection to trust all certificates without validation. True is not recommended for production use. |
Trust Store Path | Specify the file path location of the trust store certificates you want this secure connection to use. |
Trust Store Type | IBM MQ only: Specify the format of the trust store. |
Verify Host | Active MQ only: Specify True if you want this secure connection to verify that the host server name matches its certificate. Specify False to omit host verification. |
Batch tab
Use this tab to determine how many messages to consume before processing. You can specify message count and/or a specific amount of time.
How many messages consumed before processing is defined by either the Duration (ms) or the Number of records option. Messages are consumed when either the specified duration or number of records occur. For example, if Duration (ms) is set to 1000 milliseconds and Number of records is 1000, messages are consumed for processing whenever time intervals of 1000 milliseconds are reached or 1000 records have been received. If you set either option to zero, PDI will ignore that parameter.
You can also specify the maximum number of batches used to collect records at the same time.
Option | Description |
Duration (ms) | Specify a time in milliseconds. This value is the amount of time
the step will spend collecting records prior to the execution of the
transformation. If this option set to a value of 0, then Number of records triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation. NoteYou must set this field if you are
using Spark as your processing engine. |
Number of records | Specify a number. After every ‘X’ number of records, the specified
transformation will be executed and these ‘X’ records will be passed to the
transformation. If this option set to a value of 0 then Duration triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation. |
Maximum concurrent batches | Specify the maximum number of batches used to collect records at the same time.
The default value is 1, which indicates a single
batch is used for collecting records. This option should only be used when your consumer step cannot keep pace with the speed at which the data is streaming. Your computing environment must have adequate CPU and memory for this implementation. An error will occur if your environment cannot handle the maximum number of concurrent batches specified. |
Message prefetch limit | Specify a limit for how many incoming messages this step will queue for processing, as they are received from the broker. Setting this value forces the broker to manage the backpressure of messages exceeding the specified limit. The default number of messages to queue is 100000. |
Fields tab
Use this tab to define the fields in the record format.
Option | Description |
Input name | The input name is received from the JMS streams. The following
are received by default:
|
Output name | The Output name can be mapped to subscriber and member requirements. |
Type | The Type field defines the data format for streaming the record which is the same data type that produced the records. Option is String. |
Result fields tab
Use the Return fields from option on this is tab to select the name of the step from the child transformation that will stream records back to the parent transformation. The data values in these returned fields will be available to any subsequent downstream steps in the parent transformation.
Metadata injection support
All fields of this step support metadata injection. You can use this step with ETL metadata injection to pass metadata to your transformation at runtime.