How to do Total Order Sorting in Hadoop MapReduce

Being able to sort by all keys in a data set is a common need in the world of big data. Those familiar with Hive or relational databases know that this easily be done with with a simple SQL statement. For example, sorting an entire data set by “first_name” would look something like this:
SELECT * FROM my_table ORDER BY first_name

Achieving this same result in MapReduce is unfortunately a little more complicated. But the basic idea is that we can use the TotalOrderPartitioner to partition the data in such a way that each reducer will only receive data containing a specific range of keys. For example, if we are trying to sort an entire data set that contains keys with the values between 1 and 10 (distributed evenly), using two reducers we would expect that the first reducer would be sent data with keys containing 1-5 and the second reducer would be sent records containing keys 6-10.

In order to determine the minimum and maximum values sent to each reducer, input data is sampled to obtain a general idea of the cardinality and frequency of the keys in the data set. Once the data is sampled, a “paritioner” file is created which indicates the range of keys sent to each reducer. The keys sent to each reducer are sorted during the “shuffle and sort” phase of map reduce, and reducer data is output in the same order. Reading reduce files in order will result in a data set globally sorted by key.

Below is an example of doing Total Order Sorting in Hadoop MapReduce:

A quick explanation of the globally sorting MapReduce example above:
Job 1:
The first job is used simply to prep the data needing to be sorted. This job does not necessarily need to be run. Output from previously run MapReduce jobs may be used. However, the input data for Job #2 must be in the SequenceFileInputFormat.

Job 2:
This job contains most of the logic for total order sorting. First we set up our Mapper and Reducer classes. In the example above, there are no changes made to the input data, so an identity mapper and identity reducer is used.

The number of reducers must be set before creating the partitioner file. This is because for n reducers, there will be n-1 boundaries in the partitioner file.

Next, we sample the input data for this job to determine the boundaries for the partitioner file. There are a variety of methods for sampling the input data. In this example RandomSampler is used.

Next we set the partition file in TotalOrderPartitioner, and set TotalOrderPartitioner as the partitioner of this job.

The job runs, and there will be 5 output files because we specified 5 reducers. Reading these output files in order will result in a data set globally sorted by key.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">