Skip to main content

Pentaho+ documentation has moved!

The new product documentation portal is here. Check it out now at docs.hitachivantara.com

 

Hitachi Vantara Lumada and Pentaho Documentation

Partitioning data

Parent article

Partitioning data allows you to distribute all the data from a set into distinct subsets according to the rule applied on a table or row, where these subsets form a partition of the original set with no item replicated into multiple groups.

Partitioning data is an important feature for scaling up and scaling out your Pentaho Data Integration transformations and jobs. Scaling up makes the most of a single server with multiple CPU cores, while scaling out maximizes the resources of multiple servers operating in parallel.

Get started

By default, each step in a transformation is executed in parallel in a single separate thread. Consider, for example, the transformation below. With a single copy of each step, the data is read from the CSV file input step and then aggregated in the count by state step. The results of which can be verified by examining the preview data.

Partitioning use case example

Partitioning during data processing

To take advantage of the processing resources in your server, you can scale up the transformation using the multi-threading option Change Number of Copies to Start to produce copies of the steps (right-click the step to access the menu). As shown below, the x2 notation indicates that two copies will be started at runtime. By default, this data movement from the CSV file input step into the count by state step will be performed in round-robin order. This means that if there are 'N' copies, the first copy gets the first row, the second copy gets the second row, and the Nth copy receives the Nth row. Row N+1 goes to the first copy again, and so on until there are no more rows to distribute. Reading the data from the CSV file is done in parallel. Attempting to aggregate in parallel, however, produces incorrect results because the rows are split arbitrarily (without a specific rule) over the two copies of the count by state aggregation step, as shown in the preview data.

Aggregate in parallel, data error example

Understand repartitioning logic

Data distribution in the steps is shown in the following table.

Data distributions by steps

As you can see, the CSV file input step divides the work between two step copies and each copy reads 50 rows of data. However, these 2 step copies also need to make sure that the rows end up on the correct count by state step copy where they arrive in a 43/57 split. Because of that, it is a general rule that the step performing the repartitioning (row redistribution) of the data (a non-partitioned step before a partitioned one) has internal buffers from every source step copy to every target step copy, as shown below.

Work division between step copies with partitioning

This is where partitioning data becomes a useful concept, as it applies specific rule-based direction for aggregation, directing rows from the same state to the same step copy, so that the rows are not split arbitrarily. In the example below, a partition schema called State was applied to the count by state step and the Remainder of division partitioning rule was applied to the State field. Now, the count by state aggregation step produces consistent correct results because the rows were split up according to the partition schema and rule, as shown in the preview data.

Partitioning data using rule-based aggregation

NoteTo view this transformation in the PDI client, open the Pentaho/…/design-tools/data-integration/samples/transformations/General - parallel reading and aggregation.ktr sample file.

Partitioning data over tables

The Table output step (double-click the step to open it) supports partitioning rows of data to different tables. When configured to accept the table name from a Partitioning field, the PDI client will output the rows to the appropriate table. You can also Partition data per month or Partition data per day. To ensure that all the necessary tables exist, we recommend creating them in a separate transformation.

Table output step

Use partitioning

The partitioning method you use can be based on any criteria, can include no rule (round-robin row distribution), or can be created using a partitioning method plugin. The idea is to establish a criterion by which to partition the data, so that resulting storage and processing groups are logically independent from each other.

  • Step 1: Set up the partition schema:

    Perform the following actions:

    1. Configure a partition schema. A partition schema defines how many ways the row stream will be split. The names used for the partitions can be anything you like.
    2. Apply the partition schema to the Group By step. By applying a partition schema to a step, a matching set of step copies is started automatically (for example, if applying a partition schema with three partitions, three step copies are launched).
  • Step 2: Select the partitioning method:

    Establish the partitioning method for the step, which defines the rule for row distribution across the copies. The Remainder of division rule allows rows with the same state value to be sent to the same step copy and the distribution of similar rows among the steps. If the modulo is calculated on a non-integer value, the PDI client calculates the modulo on a checksum created from the String, Date, and Number value.

NoteWhen you run the transformation, there are no guarantees as to which page name goes to which step copy, only that any page name encountered is consistently forwarded to the same step copy.

Use data swimlanes

When a partitioned step passes data to another partitioned step with the same partition schema, the data is kept in swimlanes because no repartitioning needs to be done. As illustrated below, no extra buffers (row sets) are allocated between the copies of steps count by state and Replace in string.

Data swimlanes

The step copies remain isolated from one another and the rows of data travel in swimlanes. No extra work needs to be done to keep the data partitioned, so you can chain as many partitioned steps as needed. This will internally be executed as shown in the following illustration.

Internal execution of partitioned steps

Rules for partitioning

When you use partitioning, the logic used for distribution, repartitioning, and buffer allocations will be dependent upon the following rules:

  • A partitioned step causes one step copy to be executed per partition in the partition schema.
  • When a step needs to repartition the data, the step creates buffers (row sets) from each source step copy to each target step copy (partition).
  • When rows of data pass from a non-partitioned step to a partitioned one, data is repartitioned and extra buffers are allocated.
  • When rows of data, partitioned with the same partition schema, pass from a partitioned step to another partitioned step, data is not repartitioned.
  • When rows of data, partitioned with a different partition schema, pass from a partitioned step to another partitioned step, data is repartitioned.

Partitioning clustered transformations

Partitioning data allows your transformations to scale out on a cluster of slave servers to maximize the resources of machines operating in parallel. When a step is assigned to run on a Carte master node (that is, non-clustered in a clustered transformation), the same rules apply as described in Rules for partitioning.

In case a clustered step is partitioned, the partitions are distributed over the number of slave servers. As a result, the number of partitions needs to be equal to or higher than the number of slave servers in the cluster schema. It is, therefore, recommended to allow the PDI client to create the partition schema dynamically in a clustered environment.

You should always limit repartitioning on a cluster to a minimum, as high amounts of networking and CPU overhead can be incurred, which is caused by the massive amounts of data passing from one server to another over TCP/IP sockets. Also, to get optimal performance on a cluster, try to keep the data in swimlanes, described in Use data swimlanes, for as long as possible.

Partitioning schema dialog box